diff --git a/cmd/cloudflared/tunnel/configuration.go b/cmd/cloudflared/tunnel/configuration.go index 08194345..e6c05d67 100644 --- a/cmd/cloudflared/tunnel/configuration.go +++ b/cmd/cloudflared/tunnel/configuration.go @@ -30,7 +30,10 @@ import ( tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) -const secretValue = "*****" +const ( + secretValue = "*****" + icmpFunnelTimeout = time.Second * 10 +) var ( developerPortal = "https://developers.cloudflare.com/cloudflare-one/connections/connect-apps/install-and-setup" @@ -361,7 +364,7 @@ func newPacketConfig(c *cli.Context, logger *zerolog.Logger) (*ingress.GlobalRou logger.Info().Msgf("ICMP proxy will use %s as source for IPv6", ipv6Src) } - icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, zone, logger) + icmpRouter, err := ingress.NewICMPRouter(ipv4Src, ipv6Src, zone, logger, icmpFunnelTimeout) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index feead7cd..4cefc105 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 // indirect github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect + github.com/fortytw2/leaktest v1.3.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect diff --git a/go.sum b/go.sum index bebebca5..aab4cc86 100644 --- a/go.sum +++ b/go.sum @@ -88,6 +88,8 @@ github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870 h1:E2s37DuLxFhQD github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/getsentry/sentry-go v0.16.0 h1:owk+S+5XcgJLlGR/3+3s6N4d+uKwqYvh/eS0AIMjPWo= diff --git a/ingress/icmp_darwin.go b/ingress/icmp_darwin.go index 8c74598f..4e315f15 100644 --- a/ingress/icmp_darwin.go +++ b/ingress/icmp_darwin.go @@ -131,7 +131,7 @@ func newICMPProxy(listenIP netip.Addr, zone string, logger *zerolog.Logger, idle } func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *packetResponder) error { - ctx, span := responder.requestSpan(ctx, pk) + _, span := responder.requestSpan(ctx, pk) defer responder.exportSpan() originalEcho, err := getICMPEcho(pk.Message) @@ -139,10 +139,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } - span.SetAttributes( - attribute.Int("originalEchoID", originalEcho.ID), - attribute.Int("seq", originalEcho.Seq), - ) + observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq) + echoIDTrackerKey := flow3Tuple{ srcIP: pk.Src, dstIP: pk.Dst, @@ -189,6 +187,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } + err = icmpFlow.sendToDst(pk.Dst, pk.Message) if err != nil { tracing.EndWithErrorStatus(span, err) @@ -269,15 +268,12 @@ func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error { _, span := icmpFlow.responder.replySpan(ctx, ip.logger) defer icmpFlow.responder.exportSpan() - span.SetAttributes( - attribute.String("dst", reply.from.String()), - attribute.Int("echoID", reply.echo.ID), - attribute.Int("seq", reply.echo.Seq), - attribute.Int("originalEchoID", icmpFlow.originalEchoID), - ) if err := icmpFlow.returnToSrc(reply); err != nil { tracing.EndWithErrorStatus(span, err) + return err } + observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq) + span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID)) tracing.End(span) return nil } diff --git a/ingress/icmp_linux.go b/ingress/icmp_linux.go index 321b5c3f..8b8b0208 100644 --- a/ingress/icmp_linux.go +++ b/ingress/icmp_linux.go @@ -107,10 +107,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa tracing.EndWithErrorStatus(span, err) return err } - span.SetAttributes( - attribute.Int("originalEchoID", originalEcho.ID), - attribute.Int("seq", originalEcho.Seq), - ) + observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq) shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder.datagramMuxer, pk, originalEcho.ID) newFunnelFunc := func() (packet.Funnel, error) { @@ -156,14 +153,8 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa Int("originalEchoID", originalEcho.ID). Msg("New flow") go func() { - defer ip.srcFunnelTracker.Unregister(funnelID, icmpFlow) - if err := ip.listenResponse(ctx, icmpFlow); err != nil { - ip.logger.Debug().Err(err). - Str("src", pk.Src.String()). - Str("dst", pk.Dst.String()). - Int("originalEchoID", originalEcho.ID). - Msg("Failed to listen for ICMP echo response") - } + ip.listenResponse(ctx, icmpFlow) + ip.srcFunnelTracker.Unregister(funnelID, icmpFlow) }() } if err := icmpFlow.sendToDst(pk.Dst, pk.Message); err != nil { @@ -179,17 +170,17 @@ func (ip *icmpProxy) Serve(ctx context.Context) error { return ctx.Err() } -func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) error { +func (ip *icmpProxy) listenResponse(ctx context.Context, flow *icmpEchoFlow) { buf := make([]byte, mtu) for { - retryable, err := ip.handleResponse(ctx, flow, buf) - if err != nil && !retryable { - return err + if done := ip.handleResponse(ctx, flow, buf); done { + return } } } -func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (retryableErr bool, err error) { +// Listens for ICMP response and handles error logging +func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf []byte) (done bool) { _, span := flow.responder.replySpan(ctx, ip.logger) defer flow.responder.exportSpan() @@ -199,33 +190,36 @@ func (ip *icmpProxy) handleResponse(ctx context.Context, flow *icmpEchoFlow, buf n, from, err := flow.originConn.ReadFrom(buf) if err != nil { + if flow.IsClosed() { + tracing.EndWithErrorStatus(span, fmt.Errorf("flow was closed")) + return true + } + ip.logger.Error().Err(err).Str("socket", flow.originConn.LocalAddr().String()).Msg("Failed to read from ICMP socket") tracing.EndWithErrorStatus(span, err) - return false, err + return true } reply, err := parseReply(from, buf[:n]) if err != nil { ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply") tracing.EndWithErrorStatus(span, err) - return true, err + return false } if !isEchoReply(reply.msg) { err := fmt.Errorf("Expect ICMP echo reply, got %s", reply.msg.Type) ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type) tracing.EndWithErrorStatus(span, err) - return true, err + return false } - span.SetAttributes( - attribute.String("dst", reply.from.String()), - attribute.Int("echoID", reply.echo.ID), - attribute.Int("seq", reply.echo.Seq), - ) + if err := flow.returnToSrc(reply); err != nil { - ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") + ip.logger.Error().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply") tracing.EndWithErrorStatus(span, err) - return true, err + return false } + + observeICMPReply(ip.logger, span, from.String(), reply.echo.ID, reply.echo.Seq) tracing.End(span) - return true, nil + return false } // Only linux uses flow3Tuple as FunnelID diff --git a/ingress/icmp_posix.go b/ingress/icmp_posix.go index 504df60a..b03be49e 100644 --- a/ingress/icmp_posix.go +++ b/ingress/icmp_posix.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "net/netip" + "sync/atomic" "github.com/google/gopacket/layers" "github.com/rs/zerolog" @@ -46,6 +47,7 @@ type flow3Tuple struct { type icmpEchoFlow struct { *packet.ActivityTracker closeCallback func() error + closed *atomic.Bool src netip.Addr originConn *icmp.PacketConn responder *packetResponder @@ -59,6 +61,7 @@ func newICMPEchoFlow(src netip.Addr, closeCallback func() error, originConn *icm return &icmpEchoFlow{ ActivityTracker: packet.NewActivityTracker(), closeCallback: closeCallback, + closed: &atomic.Bool{}, src: src, originConn: originConn, responder: responder, @@ -86,9 +89,14 @@ func (ief *icmpEchoFlow) Equal(other packet.Funnel) bool { } func (ief *icmpEchoFlow) Close() error { + ief.closed.Store(true) return ief.closeCallback() } +func (ief *icmpEchoFlow) IsClosed() bool { + return ief.closed.Load() +} + // sendToDst rewrites the echo ID to the one assigned to this flow func (ief *icmpEchoFlow) sendToDst(dst netip.Addr, msg *icmp.Message) error { ief.UpdateLastActive() diff --git a/ingress/icmp_posix_test.go b/ingress/icmp_posix_test.go index 79c6c89a..ea8dc7e9 100644 --- a/ingress/icmp_posix_test.go +++ b/ingress/icmp_posix_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/google/gopacket/layers" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -18,6 +19,8 @@ import ( ) func TestFunnelIdleTimeout(t *testing.T) { + defer leaktest.Check(t)() + const ( idleTimeout = time.Second echoID = 42573 @@ -73,13 +76,16 @@ func TestFunnelIdleTimeout(t *testing.T) { require.NoError(t, proxy.Request(ctx, &pk, &newResponder)) validateEchoFlow(t, <-newMuxer.cfdToEdge, &pk) + time.Sleep(idleTimeout * 2) cancel() <-proxyDone } func TestReuseFunnel(t *testing.T) { + defer leaktest.Check(t)() + const ( - idleTimeout = time.Second + idleTimeout = time.Millisecond * 100 echoID = 42573 startSeq = 8129 ) @@ -135,6 +141,8 @@ func TestReuseFunnel(t *testing.T) { require.True(t, found) require.Equal(t, funnel1, funnel2) + time.Sleep(idleTimeout * 2) + cancel() <-proxyDone } diff --git a/ingress/icmp_windows.go b/ingress/icmp_windows.go index 816ed383..19604ee4 100644 --- a/ingress/icmp_windows.go +++ b/ingress/icmp_windows.go @@ -281,10 +281,7 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa if err != nil { return err } - requestSpan.SetAttributes( - attribute.Int("originalEchoID", echo.ID), - attribute.Int("seq", echo.Seq), - ) + observeICMPRequest(ip.logger, requestSpan, pk.Src.String(), pk.Dst.String(), echo.ID, echo.Seq) resp, err := ip.icmpEchoRoundtrip(pk.Dst, echo) if err != nil { @@ -296,17 +293,17 @@ func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder *pa responder.exportSpan() _, replySpan := responder.replySpan(ctx, ip.logger) - replySpan.SetAttributes( - attribute.Int("originalEchoID", echo.ID), - attribute.Int("seq", echo.Seq), - attribute.Int64("rtt", int64(resp.rtt())), - attribute.String("status", resp.status().String()), - ) err = ip.handleEchoReply(pk, echo, resp, responder) if err != nil { + ip.logger.Err(err).Msg("Failed to send ICMP reply") tracing.EndWithErrorStatus(replySpan, err) return errors.Wrap(err, "failed to handle ICMP echo reply") } + observeICMPReply(ip.logger, replySpan, pk.Dst.String(), echo.ID, echo.Seq) + replySpan.SetAttributes( + attribute.Int64("rtt", int64(resp.rtt())), + attribute.String("status", resp.status().String()), + ) tracing.End(replySpan) return nil } diff --git a/ingress/origin_icmp_proxy.go b/ingress/origin_icmp_proxy.go index 466608b3..49c25dff 100644 --- a/ingress/origin_icmp_proxy.go +++ b/ingress/origin_icmp_proxy.go @@ -7,6 +7,8 @@ import ( "time" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" @@ -15,9 +17,7 @@ import ( ) const ( - // funnelIdleTimeout controls how long to wait to close a funnel without send/return - funnelIdleTimeout = time.Second * 10 - mtu = 1500 + mtu = 1500 // icmpRequestTimeoutMs controls how long to wait for a reply icmpRequestTimeoutMs = 1000 ) @@ -32,8 +32,9 @@ type icmpRouter struct { } // NewICMPRouter doesn't return an error if either ipv4 proxy or ipv6 proxy can be created. The machine might only -// support one of them -func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerolog.Logger) (*icmpRouter, error) { +// support one of them. +// funnelIdleTimeout controls how long to wait to close a funnel without send/return +func NewICMPRouter(ipv4Addr, ipv6Addr netip.Addr, ipv6Zone string, logger *zerolog.Logger, funnelIdleTimeout time.Duration) (*icmpRouter, error) { ipv4Proxy, ipv4Err := newICMPProxy(ipv4Addr, "", logger, funnelIdleTimeout) ipv6Proxy, ipv6Err := newICMPProxy(ipv6Addr, ipv6Zone, logger, funnelIdleTimeout) if ipv4Err != nil && ipv6Err != nil { @@ -102,3 +103,25 @@ func getICMPEcho(msg *icmp.Message) (*icmp.Echo, error) { func isEchoReply(msg *icmp.Message) bool { return msg.Type == ipv4.ICMPTypeEchoReply || msg.Type == ipv6.ICMPTypeEchoReply } + +func observeICMPRequest(logger *zerolog.Logger, span trace.Span, src string, dst string, echoID int, seq int) { + logger.Debug(). + Str("src", src). + Str("dst", dst). + Int("originalEchoID", echoID). + Int("originalEchoSeq", seq). + Msg("Received ICMP request") + span.SetAttributes( + attribute.Int("originalEchoID", echoID), + attribute.Int("seq", seq), + ) +} + +func observeICMPReply(logger *zerolog.Logger, span trace.Span, dst string, echoID int, seq int) { + logger.Debug().Str("dst", dst).Int("echoID", echoID).Int("seq", seq).Msg("Sent ICMP reply to edge") + span.SetAttributes( + attribute.String("dst", dst), + attribute.Int("echoID", echoID), + attribute.Int("seq", seq), + ) +} diff --git a/ingress/origin_icmp_proxy_test.go b/ingress/origin_icmp_proxy_test.go index 268aaf4a..6c7b2f8c 100644 --- a/ingress/origin_icmp_proxy_test.go +++ b/ingress/origin_icmp_proxy_test.go @@ -9,7 +9,9 @@ import ( "strings" "sync" "testing" + "time" + "github.com/fortytw2/leaktest" "github.com/google/gopacket/layers" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -23,9 +25,10 @@ import ( ) var ( - noopLogger = zerolog.Nop() - localhostIP = netip.MustParseAddr("127.0.0.1") - localhostIPv6 = netip.MustParseAddr("::1") + noopLogger = zerolog.Nop() + localhostIP = netip.MustParseAddr("127.0.0.1") + localhostIPv6 = netip.MustParseAddr("::1") + testFunnelIdleTimeout = time.Millisecond * 10 ) // TestICMPProxyEcho makes sure we can send ICMP echo via the Request method and receives response via the @@ -40,12 +43,14 @@ func TestICMPRouterEcho(t *testing.T) { } func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { + defer leaktest.Check(t)() + const ( echoID = 36571 endSeq = 20 ) - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) + router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -97,14 +102,19 @@ func testICMPRouterEcho(t *testing.T, sendIPv4 bool) { validateEchoFlow(t, <-muxer.cfdToEdge, &pk) } } + + // Make sure funnel cleanup kicks in + time.Sleep(testFunnelIdleTimeout * 2) cancel() <-proxyDone } func TestTraceICMPRouterEcho(t *testing.T) { + defer leaktest.Check(t)() + tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1" - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) + router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -196,6 +206,7 @@ func TestTraceICMPRouterEcho(t *testing.T) { default: } + time.Sleep(testFunnelIdleTimeout * 2) cancel() <-proxyDone } @@ -203,12 +214,14 @@ func TestTraceICMPRouterEcho(t *testing.T) { // TestConcurrentRequests makes sure icmpRouter can send concurrent requests to the same destination with different // echo ID. This simulates concurrent ping to the same destination. func TestConcurrentRequestsToSameDst(t *testing.T) { + defer leaktest.Check(t)() + const ( concurrentPings = 5 endSeq = 5 ) - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) + router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) proxyDone := make(chan struct{}) @@ -282,12 +295,16 @@ func TestConcurrentRequestsToSameDst(t *testing.T) { }() } wg.Wait() + + time.Sleep(testFunnelIdleTimeout * 2) cancel() <-proxyDone } // TestICMPProxyRejectNotEcho makes sure it rejects messages other than echo func TestICMPRouterRejectNotEcho(t *testing.T) { + defer leaktest.Check(t)() + msgs := []icmp.Message{ { Type: ipv4.ICMPTypeDestinationUnreachable, @@ -341,7 +358,7 @@ func TestICMPRouterRejectNotEcho(t *testing.T) { } func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp.Message) { - router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger) + router, err := NewICMPRouter(localhostIP, localhostIPv6, "", &noopLogger, testFunnelIdleTimeout) require.NoError(t, err) muxer := newMockMuxer(1) diff --git a/vendor/github.com/fortytw2/leaktest/.travis.yml b/vendor/github.com/fortytw2/leaktest/.travis.yml new file mode 100644 index 00000000..5a791be1 --- /dev/null +++ b/vendor/github.com/fortytw2/leaktest/.travis.yml @@ -0,0 +1,16 @@ +language: go +go: + - 1.8 + - 1.9 + - "1.10" + - "1.11" + - tip + +script: + - go test -v -race -parallel 5 -coverprofile=coverage.txt -covermode=atomic ./ + - go test github.com/fortytw2/leaktest -run ^TestEmptyLeak$ + +before_install: + - pip install --user codecov +after_success: + - codecov diff --git a/vendor/github.com/fortytw2/leaktest/LICENSE b/vendor/github.com/fortytw2/leaktest/LICENSE new file mode 100644 index 00000000..74487567 --- /dev/null +++ b/vendor/github.com/fortytw2/leaktest/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2012 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/fortytw2/leaktest/README.md b/vendor/github.com/fortytw2/leaktest/README.md new file mode 100644 index 00000000..82822bf0 --- /dev/null +++ b/vendor/github.com/fortytw2/leaktest/README.md @@ -0,0 +1,64 @@ +## Leaktest [![Build Status](https://travis-ci.org/fortytw2/leaktest.svg?branch=master)](https://travis-ci.org/fortytw2/leaktest) [![codecov](https://codecov.io/gh/fortytw2/leaktest/branch/master/graph/badge.svg)](https://codecov.io/gh/fortytw2/leaktest) [![Sourcegraph](https://sourcegraph.com/github.com/fortytw2/leaktest/-/badge.svg)](https://sourcegraph.com/github.com/fortytw2/leaktest?badge) [![Documentation](https://godoc.org/github.com/fortytw2/gpt?status.svg)](http://godoc.org/github.com/fortytw2/leaktest) + +Refactored, tested variant of the goroutine leak detector found in both +`net/http` tests and the `cockroachdb` source tree. + +Takes a snapshot of running goroutines at the start of a test, and at the end - +compares the two and _voila_. Ignores runtime/sys goroutines. Doesn't play nice +with `t.Parallel()` right now, but there are plans to do so. + +### Installation + +Go 1.7+ + +``` +go get -u github.com/fortytw2/leaktest +``` + +Go 1.5/1.6 need to use the tag `v1.0.0`, as newer versions depend on +`context.Context`. + +### Example + +These tests fail, because they leak a goroutine + +```go +// Default "Check" will poll for 5 seconds to check that all +// goroutines are cleaned up +func TestPool(t *testing.T) { + defer leaktest.Check(t)() + + go func() { + for { + time.Sleep(time.Second) + } + }() +} + +// Helper function to timeout after X duration +func TestPoolTimeout(t *testing.T) { + defer leaktest.CheckTimeout(t, time.Second)() + + go func() { + for { + time.Sleep(time.Second) + } + }() +} + +// Use Go 1.7+ context.Context for cancellation +func TestPoolContext(t *testing.T) { + ctx, _ := context.WithTimeout(context.Background(), time.Second) + defer leaktest.CheckContext(ctx, t)() + + go func() { + for { + time.Sleep(time.Second) + } + }() +} +``` + +## LICENSE + +Same BSD-style as Go, see LICENSE diff --git a/vendor/github.com/fortytw2/leaktest/leaktest.go b/vendor/github.com/fortytw2/leaktest/leaktest.go new file mode 100644 index 00000000..219e9307 --- /dev/null +++ b/vendor/github.com/fortytw2/leaktest/leaktest.go @@ -0,0 +1,153 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package leaktest provides tools to detect leaked goroutines in tests. +// To use it, call "defer leaktest.Check(t)()" at the beginning of each +// test that may use goroutines. +// copied out of the cockroachdb source tree with slight modifications to be +// more re-useable +package leaktest + +import ( + "context" + "fmt" + "runtime" + "sort" + "strconv" + "strings" + "time" +) + +type goroutine struct { + id uint64 + stack string +} + +type goroutineByID []*goroutine + +func (g goroutineByID) Len() int { return len(g) } +func (g goroutineByID) Less(i, j int) bool { return g[i].id < g[j].id } +func (g goroutineByID) Swap(i, j int) { g[i], g[j] = g[j], g[i] } + +func interestingGoroutine(g string) (*goroutine, error) { + sl := strings.SplitN(g, "\n", 2) + if len(sl) != 2 { + return nil, fmt.Errorf("error parsing stack: %q", g) + } + stack := strings.TrimSpace(sl[1]) + if strings.HasPrefix(stack, "testing.RunTests") { + return nil, nil + } + + if stack == "" || + // Ignore HTTP keep alives + strings.Contains(stack, ").readLoop(") || + strings.Contains(stack, ").writeLoop(") || + // Below are the stacks ignored by the upstream leaktest code. + strings.Contains(stack, "testing.Main(") || + strings.Contains(stack, "testing.(*T).Run(") || + strings.Contains(stack, "runtime.goexit") || + strings.Contains(stack, "created by runtime.gc") || + strings.Contains(stack, "interestingGoroutines") || + strings.Contains(stack, "runtime.MHeap_Scavenger") || + strings.Contains(stack, "signal.signal_recv") || + strings.Contains(stack, "sigterm.handler") || + strings.Contains(stack, "runtime_mcall") || + strings.Contains(stack, "goroutine in C code") { + return nil, nil + } + + // Parse the goroutine's ID from the header line. + h := strings.SplitN(sl[0], " ", 3) + if len(h) < 3 { + return nil, fmt.Errorf("error parsing stack header: %q", sl[0]) + } + id, err := strconv.ParseUint(h[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing goroutine id: %s", err) + } + + return &goroutine{id: id, stack: strings.TrimSpace(g)}, nil +} + +// interestingGoroutines returns all goroutines we care about for the purpose +// of leak checking. It excludes testing or runtime ones. +func interestingGoroutines(t ErrorReporter) []*goroutine { + buf := make([]byte, 2<<20) + buf = buf[:runtime.Stack(buf, true)] + var gs []*goroutine + for _, g := range strings.Split(string(buf), "\n\n") { + gr, err := interestingGoroutine(g) + if err != nil { + t.Errorf("leaktest: %s", err) + continue + } else if gr == nil { + continue + } + gs = append(gs, gr) + } + sort.Sort(goroutineByID(gs)) + return gs +} + +// ErrorReporter is a tiny subset of a testing.TB to make testing not such a +// massive pain +type ErrorReporter interface { + Errorf(format string, args ...interface{}) +} + +// Check snapshots the currently-running goroutines and returns a +// function to be run at the end of tests to see whether any +// goroutines leaked, waiting up to 5 seconds in error conditions +func Check(t ErrorReporter) func() { + return CheckTimeout(t, 5*time.Second) +} + +// CheckTimeout is the same as Check, but with a configurable timeout +func CheckTimeout(t ErrorReporter, dur time.Duration) func() { + ctx, cancel := context.WithCancel(context.Background()) + fn := CheckContext(ctx, t) + return func() { + timer := time.AfterFunc(dur, cancel) + fn() + // Remember to clean up the timer and context + timer.Stop() + cancel() + } +} + +// CheckContext is the same as Check, but uses a context.Context for +// cancellation and timeout control +func CheckContext(ctx context.Context, t ErrorReporter) func() { + orig := map[uint64]bool{} + for _, g := range interestingGoroutines(t) { + orig[g.id] = true + } + return func() { + var leaked []string + for { + select { + case <-ctx.Done(): + t.Errorf("leaktest: timed out checking goroutines") + default: + leaked = make([]string, 0) + for _, g := range interestingGoroutines(t) { + if !orig[g.id] { + leaked = append(leaked, g.stack) + } + } + if len(leaked) == 0 { + return + } + // don't spin needlessly + time.Sleep(time.Millisecond * 50) + continue + } + break + } + for _, g := range leaked { + t.Errorf("leaktest: leaked goroutine: %v", g) + } + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index beaa4551..f8b62811 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -71,6 +71,9 @@ github.com/facebookgo/grace/gracenet # github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 ## explicit github.com/flynn/go-shlex +# github.com/fortytw2/leaktest v1.3.0 +## explicit +github.com/fortytw2/leaktest # github.com/fsnotify/fsnotify v1.4.9 ## explicit; go 1.13 github.com/fsnotify/fsnotify