TUN-8725: implement diagnostic procedure

## Summary
Add a function that orchestrates the diagnostic jobs producing a zip file at the end.

Closes TUN-8725
This commit is contained in:
Luis Neto 2024-12-04 03:37:57 -08:00
parent 451f98e1d1
commit 7bd86762a7
8 changed files with 457 additions and 11 deletions

View File

@ -38,7 +38,7 @@ func (client *httpClient) SetBaseURL(baseURL *url.URL) {
func (client *httpClient) GET(ctx context.Context, endpoint string) (*http.Response, error) { func (client *httpClient) GET(ctx context.Context, endpoint string) (*http.Response, error) {
if client.baseURL == nil { if client.baseURL == nil {
return nil, ErrNoBaseUrl return nil, ErrNoBaseURL
} }
url := client.baseURL.JoinPath(endpoint) url := client.baseURL.JoinPath(endpoint)

View File

@ -22,4 +22,12 @@ const (
goroutineDumpEndpoint = "debug/pprof/goroutine" goroutineDumpEndpoint = "debug/pprof/goroutine"
metricsEndpoint = "metrics" metricsEndpoint = "metrics"
tunnelConfigurationEndpoint = "/config" tunnelConfigurationEndpoint = "/config"
// Base for filenames of the diagnostic procedure
systemInformationBaseName = "systeminformation.json"
metricsBaseName = "metrics.txt"
zipName = "cloudflared-diag"
heapPprofBaseName = "heap.pprof"
goroutinePprofBaseName = "goroutine.pprof"
networkBaseName = "network.json"
tunnelStateBaseName = "tunnelstate.json"
) )

392
diagnostic/diagnostic.go Normal file
View File

@ -0,0 +1,392 @@
package diagnostic
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"sync"
"time"
"github.com/rs/zerolog"
network "github.com/cloudflare/cloudflared/diagnostic/network"
)
// Struct used to hold the results of different routines executing the network collection.
type networkCollectionResult struct {
name string
info []*network.Hop
raw string
err error
}
// This type represents the most common functions from the diagnostic http client
// functions.
type collectToWriterFunc func(ctx context.Context, writer io.Writer) error
// This type represents the common denominator among all the collection procedures.
type collectFunc func(ctx context.Context) (string, error)
// collectJob is an internal struct that denotes holds the information necessary
// to run a collection job.
type collectJob struct {
jobName string
fn collectFunc
bypass bool
}
// The Toggles structure denotes the available toggles for the diagnostic procedure.
// Each toggle enables/disables tasks from the diagnostic.
type Toggles struct {
NoDiagLogs bool
NoDiagMetrics bool
NoDiagSystem bool
NoDiagRuntime bool
NoDiagNetwork bool
}
// The Options structure holds every option necessary for
// the diagnostic procedure to work.
type Options struct {
KnownAddresses []string
Address string
ContainerID string
PodID string
Toggles Toggles
}
func collectLogs(
ctx context.Context,
client HTTPClient,
diagContainer, diagPod string,
) (string, error) {
var collector LogCollector
if diagPod != "" {
collector = NewKubernetesLogCollector(diagContainer, diagPod)
} else if diagContainer != "" {
collector = NewDockerLogCollector(diagContainer)
} else {
collector = NewHostLogCollector(client)
}
logInformation, err := collector.Collect(ctx)
if err != nil {
return "", fmt.Errorf("error collecting logs: %w", err)
}
if logInformation.isDirectory {
return CopyFilesFromDirectory(logInformation.path)
}
if logInformation.wasCreated {
return logInformation.path, nil
}
logHandle, err := os.Open(logInformation.path)
if err != nil {
return "", fmt.Errorf("error opening log file while collecting logs: %w", err)
}
defer logHandle.Close()
outputLogHandle, err := os.Create(filepath.Join(os.TempDir(), logFilename))
if err != nil {
return "", ErrCreatingTemporaryFile
}
defer outputLogHandle.Close()
_, err = io.Copy(outputLogHandle, logHandle)
if err != nil {
return "", fmt.Errorf("error copying logs while collecting logs: %w", err)
}
return outputLogHandle.Name(), err
}
func collectNetworkResultRoutine(
ctx context.Context,
collector network.NetworkCollector,
hostname string,
useIPv4 bool,
results chan networkCollectionResult,
) {
const (
hopsNo = 5
timeout = time.Second * 5
)
name := hostname
if useIPv4 {
name += "-v4"
} else {
name += "-v6"
}
hops, raw, err := collector.Collect(ctx, network.NewTraceOptions(hopsNo, timeout, hostname, useIPv4))
if err != nil {
if raw == "" {
// An error happened and there is no raw output
results <- networkCollectionResult{name, nil, "", err}
} else {
// An error happened and there is raw output then write to file
results <- networkCollectionResult{name, nil, raw, nil}
}
} else {
results <- networkCollectionResult{name, hops, raw, nil}
}
}
func collectNetworkInformation(ctx context.Context) (string, error) {
networkCollector := network.NetworkCollectorImpl{}
hostAndIPversionPairs := []struct {
host string
useV4 bool
}{
{"region1.v2.argotunnel.com", true},
{"region1.v2.argotunnel.com", false},
{"region2.v2.argotunnel.com", true},
{"region2.v2.argotunnel.com", false},
}
// the number of results is known thus use len to avoid footguns
results := make(chan networkCollectionResult, len(hostAndIPversionPairs))
var wgroup sync.WaitGroup
for _, item := range hostAndIPversionPairs {
wgroup.Add(1)
go func() {
defer wgroup.Done()
collectNetworkResultRoutine(ctx, &networkCollector, item.host, item.useV4, results)
}()
}
// Wait for routines to end.
wgroup.Wait()
resultMap := make(map[string][]*network.Hop)
for range len(hostAndIPversionPairs) {
result := <-results
resultMap[result.name] = result.info
}
networkDumpHandle, err := os.Create(filepath.Join(os.TempDir(), networkBaseName))
if err != nil {
return "", ErrCreatingTemporaryFile
}
defer networkDumpHandle.Close()
err = json.NewEncoder(networkDumpHandle).Encode(resultMap)
if err != nil {
return "", fmt.Errorf("error encoding network information results: %w", err)
}
return networkDumpHandle.Name(), nil
}
func collectFromEndpointAdapter(collect collectToWriterFunc, fileName string) collectFunc {
return func(ctx context.Context) (string, error) {
dumpHandle, err := os.Create(filepath.Join(os.TempDir(), fileName))
if err != nil {
return "", ErrCreatingTemporaryFile
}
defer dumpHandle.Close()
err = collect(ctx, dumpHandle)
if err != nil {
return "", ErrCreatingTemporaryFile
}
return dumpHandle.Name(), nil
}
}
func tunnelStateCollectEndpointAdapter(client HTTPClient, tunnel *TunnelState, fileName string) collectFunc {
endpointFunc := func(ctx context.Context, writer io.Writer) error {
if tunnel == nil {
// When the metrics server is not passed the diagnostic will query all known hosts
// and get the tunnel state, however, when the metrics server is passed that won't
// happen hence the check for nil in this function.
tunnelResponse, err := client.GetTunnelState(ctx)
if err != nil {
return fmt.Errorf("error retrieving tunnel state: %w", err)
}
tunnel = tunnelResponse
}
encoder := json.NewEncoder(writer)
err := encoder.Encode(tunnel)
return fmt.Errorf("error encoding tunnel state: %w", err)
}
return collectFromEndpointAdapter(endpointFunc, fileName)
}
// resolveInstanceBaseURL is responsible to
// resolve the base URL of the instance that should be diagnosed.
// To resolve the instance it may be necessary to query the
// /diag/tunnel endpoint of the known instances, thus, if a single
// instance is found its state is also returned; if multiple instances
// are found then their states are returned in an array along with an
// error.
func resolveInstanceBaseURL(
metricsServerAddress string,
log *zerolog.Logger,
client *httpClient,
addresses []string,
) (*url.URL, *TunnelState, []*AddressableTunnelState, error) {
if metricsServerAddress != "" {
url, err := url.Parse(metricsServerAddress)
if err != nil {
return nil, nil, nil, fmt.Errorf("provided address is not valid: %w", err)
}
if url.Scheme == "" {
url.Scheme = "http://"
}
return url, nil, nil, nil
}
tunnelState, foundTunnelStates, err := FindMetricsServer(log, client, addresses)
if err != nil {
return nil, nil, foundTunnelStates, err
}
return tunnelState.URL, tunnelState.TunnelState, nil, nil
}
func createJobs(
client *httpClient,
tunnel *TunnelState,
diagContainer string,
diagPod string,
noDiagSystem bool,
noDiagRuntime bool,
noDiagMetrics bool,
noDiagLogs bool,
noDiagNetwork bool,
) []collectJob {
jobs := []collectJob{
{
jobName: "tunnel state",
fn: tunnelStateCollectEndpointAdapter(client, tunnel, tunnelStateBaseName),
bypass: false,
},
{
jobName: "system information",
fn: collectFromEndpointAdapter(client.GetSystemInformation, systemInformationBaseName),
bypass: noDiagSystem,
},
{
jobName: "goroutine profile",
fn: collectFromEndpointAdapter(client.GetGoroutineDump, goroutinePprofBaseName),
bypass: noDiagRuntime,
},
{
jobName: "heap profile",
fn: collectFromEndpointAdapter(client.GetMemoryDump, heapPprofBaseName),
bypass: noDiagRuntime,
},
{
jobName: "metrics",
fn: collectFromEndpointAdapter(client.GetMetrics, metricsBaseName),
bypass: noDiagMetrics,
},
{
jobName: "log information",
fn: func(ctx context.Context) (string, error) {
return collectLogs(ctx, client, diagContainer, diagPod)
},
bypass: noDiagLogs,
},
{
jobName: "network information",
fn: collectNetworkInformation,
bypass: noDiagNetwork,
},
}
return jobs
}
func RunDiagnostic(
log *zerolog.Logger,
options Options,
) ([]*AddressableTunnelState, error) {
client := NewHTTPClient()
baseURL, tunnel, foundTunnels, err := resolveInstanceBaseURL(options.Address, log, client, options.KnownAddresses)
if err != nil {
return foundTunnels, err
}
log.Info().Msgf("Selected server %s starting diagnostic...", baseURL.String())
client.SetBaseURL(baseURL)
const timeout = 45 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
paths := make([]string, 0)
jobs := createJobs(
client,
tunnel,
options.ContainerID,
options.PodID,
options.Toggles.NoDiagSystem,
options.Toggles.NoDiagRuntime,
options.Toggles.NoDiagMetrics,
options.Toggles.NoDiagLogs,
options.Toggles.NoDiagNetwork,
)
for _, job := range jobs {
if job.bypass {
continue
}
log.Info().Msgf("Collecting %s...", job.jobName)
path, err := job.fn(ctx)
defer func() {
if !errors.Is(err, ErrCreatingTemporaryFile) {
os.Remove(path)
}
}()
if err != nil {
return nil, err
}
log.Info().Msgf("Collected %s.", job.jobName)
paths = append(paths, path)
}
zipfile, err := CreateDiagnosticZipFile(zipName, paths)
if err != nil {
if zipfile != "" {
os.Remove(zipfile)
}
return nil, err
}
log.Info().Msgf("Diagnostic file written: %v", zipfile)
return nil, nil
}

View File

@ -7,8 +7,8 @@ import (
var ( var (
// Error used when there is no log directory available. // Error used when there is no log directory available.
ErrManagedLogNotFound = errors.New("managed log directory not found") ErrManagedLogNotFound = errors.New("managed log directory not found")
// Error used when one key is not found. // Error used when it is not possible to collect logs using the log configuration.
ErrMustNotBeEmpty = errors.New("provided argument is empty") ErrLogConfigurationIsInvalid = errors.New("provided log configuration is invalid")
// Error used when parsing the fields of the output of collector. // Error used when parsing the fields of the output of collector.
ErrInsufficientLines = errors.New("insufficient lines") ErrInsufficientLines = errors.New("insufficient lines")
// Error used when parsing the lines of the output of collector. // Error used when parsing the lines of the output of collector.
@ -18,9 +18,11 @@ var (
// Error used when there is no disk volume information available. // Error used when there is no disk volume information available.
ErrNoVolumeFound = errors.New("no disk volume information found") ErrNoVolumeFound = errors.New("no disk volume information found")
// Error user when the base url of the diagnostic client is not provided. // Error user when the base url of the diagnostic client is not provided.
ErrNoBaseUrl = errors.New("no base url") ErrNoBaseURL = errors.New("no base url")
// Error used when no metrics server is found listening to the known addresses list (check [metrics.GetMetricsKnownAddresses]) // Error used when no metrics server is found listening to the known addresses list (check [metrics.GetMetricsKnownAddresses]).
ErrMetricsServerNotFound = errors.New("metrics server not found") ErrMetricsServerNotFound = errors.New("metrics server not found")
// Error used when multiple metrics server are found listening to the known addresses list (check [metrics.GetMetricsKnownAddresses]) // Error used when multiple metrics server are found listening to the known addresses list (check [metrics.GetMetricsKnownAddresses]).
ErrMultipleMetricsServerFound = errors.New("multiple metrics server found") ErrMultipleMetricsServerFound = errors.New("multiple metrics server found")
// Error used when a temporary file creation fails within the diagnostic procedure
ErrCreatingTemporaryFile = errors.New("temporary file creation failed")
) )

View File

@ -154,6 +154,7 @@ func (handler *Handler) ConfigurationHandler(writer http.ResponseWriter, _ *http
switch flag { switch flag {
case logger.LogDirectoryFlag: case logger.LogDirectoryFlag:
fallthrough
case logger.LogFileFlag: case logger.LogFileFlag:
{ {
// the log directory may be relative to the instance thus it must be resolved // the log directory may be relative to the instance thus it must be resolved

View File

@ -69,5 +69,5 @@ func (collector *HostLogCollector) Collect(ctx context.Context) (*LogInformation
return NewLogInformation(logConfiguration.logDirectory, false, true), nil return NewLogInformation(logConfiguration.logDirectory, false, true), nil
} }
return nil, ErrMustNotBeEmpty return nil, ErrLogConfigurationIsInvalid
} }

View File

@ -5,6 +5,7 @@ import (
"io" "io"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
) )
func PipeCommandOutputToFile(command *exec.Cmd, outputHandle *os.File) (*LogInformation, error) { func PipeCommandOutputToFile(command *exec.Cmd, outputHandle *os.File) (*LogInformation, error) {
@ -45,3 +46,45 @@ func PipeCommandOutputToFile(command *exec.Cmd, outputHandle *os.File) (*LogInfo
return NewLogInformation(outputHandle.Name(), true, false), nil return NewLogInformation(outputHandle.Name(), true, false), nil
} }
func CopyFilesFromDirectory(path string) (string, error) {
// rolling logs have as suffix the current date thus
// when iterating the path files they are already in
// chronological order
files, err := os.ReadDir(path)
if err != nil {
return "", fmt.Errorf("error reading directory %s: %w", path, err)
}
outputHandle, err := os.Create(filepath.Join(os.TempDir(), logFilename))
if err != nil {
return "", fmt.Errorf("creating file %s: %w", outputHandle.Name(), err)
}
defer outputHandle.Close()
for _, file := range files {
logHandle, err := os.Open(filepath.Join(path, file.Name()))
if err != nil {
return "", fmt.Errorf("error opening file %s:%w", file.Name(), err)
}
defer logHandle.Close()
_, err = io.Copy(outputHandle, logHandle)
if err != nil {
return "", fmt.Errorf("error copying file %s:%w", logHandle.Name(), err)
}
}
logHandle, err := os.Open(filepath.Join(path, "cloudflared.log"))
if err != nil {
return "", fmt.Errorf("error opening file %s:%w", logHandle.Name(), err)
}
defer logHandle.Close()
_, err = io.Copy(outputHandle, logHandle)
if err != nil {
return "", fmt.Errorf("error copying file %s:%w", logHandle.Name(), err)
}
return outputHandle.Name(), nil
}

View File

@ -43,12 +43,12 @@ func GetMetricsDefaultAddress(runtimeType string) string {
// startup time to allow a semi-deterministic approach to know where the server is listening at. // startup time to allow a semi-deterministic approach to know where the server is listening at.
// The ports were selected because at the time we are in 2024 and they do not collide with any // The ports were selected because at the time we are in 2024 and they do not collide with any
// know/registered port according https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers. // know/registered port according https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers.
func GetMetricsKnownAddresses(runtimeType string) [5]string { func GetMetricsKnownAddresses(runtimeType string) []string {
switch Runtime { switch runtimeType {
case "virtual": case "virtual":
return [5]string{"0.0.0.0:20241", "0.0.0.0:20242", "0.0.0.0:20243", "0.0.0.0:20244", "0.0.0.0:20245"} return []string{"0.0.0.0:20241", "0.0.0.0:20242", "0.0.0.0:20243", "0.0.0.0:20244", "0.0.0.0:20245"}
default: default:
return [5]string{"localhost:20241", "localhost:20242", "localhost:20243", "localhost:20244", "localhost:20245"} return []string{"localhost:20241", "localhost:20242", "localhost:20243", "localhost:20244", "localhost:20245"}
} }
} }