TUN-8732: implement port selection algorithm

## Summary
Implements the discovery of the metrics server given an array of addresses (known addresses), tunnelID, and connectorID.

Closes TUN-8732
This commit is contained in:
Luis Neto 2024-12-03 04:07:55 -08:00
parent f884b29d0d
commit 65786597cc
3 changed files with 222 additions and 0 deletions

View File

@ -2,12 +2,17 @@ package diagnostic
import ( import (
"archive/zip" "archive/zip"
"context"
"fmt" "fmt"
"io" "io"
"net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/rs/zerolog"
) )
// CreateDiagnosticZipFile create a zip file with the contents from the all // CreateDiagnosticZipFile create a zip file with the contents from the all
@ -67,3 +72,69 @@ func CreateDiagnosticZipFile(base string, paths []string) (zipFileName string, e
zipFileName = archive.Name() zipFileName = archive.Name()
return zipFileName, nil return zipFileName, nil
} }
type AddressableTunnelState struct {
*TunnelState
URL *url.URL
}
func findMetricsServerPredicate(tunnelID, connectorID uuid.UUID) func(state *TunnelState) bool {
if tunnelID != uuid.Nil && connectorID != uuid.Nil {
return func(state *TunnelState) bool {
return state.ConnectorID == connectorID && state.TunnelID == tunnelID
}
} else if tunnelID == uuid.Nil && connectorID != uuid.Nil {
return func(state *TunnelState) bool {
return state.ConnectorID == connectorID
}
} else if tunnelID != uuid.Nil && connectorID == uuid.Nil {
return func(state *TunnelState) bool {
return state.TunnelID == tunnelID
}
}
return func(*TunnelState) bool {
return true
}
}
// The FindMetricsServer will try to find the metrics server url.
// There are two possible error scenarios:
// 1. No instance is found which will only return ErrMetricsServerNotFound
// 2. Multiple instances are found which will return an array of state and ErrMultipleMetricsServerFound
// In case of success, only the state for the instance is returned.
func FindMetricsServer(
log *zerolog.Logger,
client *httpClient,
addresses []string,
) (*AddressableTunnelState, []*AddressableTunnelState, error) {
instances := make([]*AddressableTunnelState, 0)
for _, address := range addresses {
url, err := url.Parse("http://" + address)
if err != nil {
log.Debug().Err(err).Msgf("error parsing address %s", address)
continue
}
client.SetBaseURL(url)
state, err := client.GetTunnelState(context.Background())
if err == nil {
instances = append(instances, &AddressableTunnelState{state, url})
} else {
log.Debug().Err(err).Msgf("error getting tunnel state from address %s", address)
}
}
if len(instances) == 0 {
return nil, nil, ErrMetricsServerNotFound
}
if len(instances) == 1 {
return instances[0], nil, nil
}
return nil, instances, ErrMultipleMetricsServerFound
}

View File

@ -0,0 +1,147 @@
package diagnostic_test
import (
"context"
"net/http"
"net/url"
"sync"
"testing"
"time"
"github.com/facebookgo/grace/gracenet"
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cloudflare/cloudflared/diagnostic"
"github.com/cloudflare/cloudflared/metrics"
"github.com/cloudflare/cloudflared/tunnelstate"
)
func helperCreateServer(t *testing.T, listeners *gracenet.Net, tunnelID uuid.UUID, connectorID uuid.UUID) func() {
t.Helper()
listener, err := metrics.CreateMetricsListener(listeners, "localhost:0")
require.NoError(t, err)
log := zerolog.Nop()
tracker := tunnelstate.NewConnTracker(&log)
handler := diagnostic.NewDiagnosticHandler(&log, 0, nil, tunnelID, connectorID, tracker, nil, []string{})
router := http.NewServeMux()
router.HandleFunc("/diag/tunnel", handler.TunnelStateHandler)
server := &http.Server{
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: router,
}
var wgroup sync.WaitGroup
wgroup.Add(1)
go func() {
defer wgroup.Done()
_ = server.Serve(listener)
}()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
cleanUp := func() {
_ = server.Shutdown(ctx)
cancel()
wgroup.Wait()
}
return cleanUp
}
func TestFindMetricsServer_WhenSingleServerIsRunning_ReturnState(t *testing.T) {
listeners := gracenet.Net{}
tid1 := uuid.New()
cid1 := uuid.New()
cleanUp := helperCreateServer(t, &listeners, tid1, cid1)
defer cleanUp()
log := zerolog.Nop()
client := diagnostic.NewHTTPClient()
addresses := metrics.GetMetricsKnownAddresses("host")
url1, err := url.Parse("http://localhost:20241")
require.NoError(t, err)
tunnel1 := &diagnostic.AddressableTunnelState{
TunnelState: &diagnostic.TunnelState{
TunnelID: tid1,
ConnectorID: cid1,
Connections: nil,
},
URL: url1,
}
state, tunnels, err := diagnostic.FindMetricsServer(&log, client, addresses[:])
if err != nil {
require.ErrorIs(t, err, diagnostic.ErrMultipleMetricsServerFound)
}
assert.Equal(t, tunnel1, state)
assert.Nil(t, tunnels)
}
func TestFindMetricsServer_WhenMultipleServerAreRunning_ReturnError(t *testing.T) {
listeners := gracenet.Net{}
tid1 := uuid.New()
cid1 := uuid.New()
cid2 := uuid.New()
cleanUp := helperCreateServer(t, &listeners, tid1, cid1)
defer cleanUp()
cleanUp = helperCreateServer(t, &listeners, tid1, cid2)
defer cleanUp()
log := zerolog.Nop()
client := diagnostic.NewHTTPClient()
addresses := metrics.GetMetricsKnownAddresses("host")
url1, err := url.Parse("http://localhost:20241")
require.NoError(t, err)
url2, err := url.Parse("http://localhost:20242")
require.NoError(t, err)
tunnel1 := &diagnostic.AddressableTunnelState{
TunnelState: &diagnostic.TunnelState{
TunnelID: tid1,
ConnectorID: cid1,
Connections: nil,
},
URL: url1,
}
tunnel2 := &diagnostic.AddressableTunnelState{
TunnelState: &diagnostic.TunnelState{
TunnelID: tid1,
ConnectorID: cid2,
Connections: nil,
},
URL: url2,
}
state, tunnels, err := diagnostic.FindMetricsServer(&log, client, addresses[:])
if err != nil {
require.ErrorIs(t, err, diagnostic.ErrMultipleMetricsServerFound)
}
assert.Nil(t, state)
assert.Equal(t, []*diagnostic.AddressableTunnelState{tunnel1, tunnel2}, tunnels)
}
func TestFindMetricsServer_WhenNoInstanceIsRuning_ReturnError(t *testing.T) {
log := zerolog.Nop()
client := diagnostic.NewHTTPClient()
addresses := metrics.GetMetricsKnownAddresses("host")
state, tunnels, err := diagnostic.FindMetricsServer(&log, client, addresses[:])
require.ErrorIs(t, err, diagnostic.ErrMetricsServerNotFound)
assert.Nil(t, state)
assert.Nil(t, tunnels)
}

View File

@ -19,4 +19,8 @@ var (
ErrNoVolumeFound = errors.New("no disk volume information found") ErrNoVolumeFound = errors.New("no disk volume information found")
// Error user when the base url of the diagnostic client is not provided. // Error user when the base url of the diagnostic client is not provided.
ErrNoBaseUrl = errors.New("no base url") ErrNoBaseUrl = errors.New("no base url")
// Error used when no metrics server is found listening to the known addresses list (check [metrics.GetMetricsKnownAddresses])
ErrMetricsServerNotFound = errors.New("metrics server not found")
// Error used when multiple metrics server are found listening to the known addresses list (check [metrics.GetMetricsKnownAddresses])
ErrMultipleMetricsServerFound = errors.New("multiple metrics server found")
) )