TUN-3848: Use transport logger for h2mux
This commit is contained in:
parent
88b53eb886
commit
0d22106416
|
@ -313,11 +313,11 @@ func StartServer(
|
||||||
return fmt.Errorf(errText)
|
return fmt.Errorf(errText)
|
||||||
}
|
}
|
||||||
|
|
||||||
transportLog := logger.CreateTransportLoggerFromContext(c, isUIEnabled)
|
logTransport := logger.CreateTransportLoggerFromContext(c, isUIEnabled)
|
||||||
|
|
||||||
observer := connection.NewObserver(log, isUIEnabled)
|
observer := connection.NewObserver(log, logTransport, isUIEnabled)
|
||||||
|
|
||||||
tunnelConfig, ingressRules, err := prepareTunnelConfig(c, buildInfo, version, log, observer, namedTunnel)
|
tunnelConfig, ingressRules, err := prepareTunnelConfig(c, buildInfo, version, log, logTransport, observer, namedTunnel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Err(err).Msg("Couldn't start tunnel")
|
log.Err(err).Msg("Couldn't start tunnel")
|
||||||
return err
|
return err
|
||||||
|
@ -364,7 +364,7 @@ func StartServer(
|
||||||
&ingressRules,
|
&ingressRules,
|
||||||
tunnelConfig.HAConnections,
|
tunnelConfig.HAConnections,
|
||||||
)
|
)
|
||||||
app := tunnelUI.Launch(ctx, log, transportLog)
|
app := tunnelUI.Launch(ctx, log, logTransport)
|
||||||
observer.RegisterSink(app)
|
observer.RegisterSink(app)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -149,7 +149,7 @@ func prepareTunnelConfig(
|
||||||
c *cli.Context,
|
c *cli.Context,
|
||||||
buildInfo *buildinfo.BuildInfo,
|
buildInfo *buildinfo.BuildInfo,
|
||||||
version string,
|
version string,
|
||||||
log *zerolog.Logger,
|
log, logTransport *zerolog.Logger,
|
||||||
observer *connection.Observer,
|
observer *connection.Observer,
|
||||||
namedTunnel *connection.NamedTunnelConfig,
|
namedTunnel *connection.NamedTunnelConfig,
|
||||||
) (*origin.TunnelConfig, ingress.Ingress, error) {
|
) (*origin.TunnelConfig, ingress.Ingress, error) {
|
||||||
|
@ -252,9 +252,9 @@ func prepareTunnelConfig(
|
||||||
ReplaceExisting: c.Bool("force"),
|
ReplaceExisting: c.Bool("force"),
|
||||||
}
|
}
|
||||||
muxerConfig := &connection.MuxerConfig{
|
muxerConfig := &connection.MuxerConfig{
|
||||||
HeartbeatInterval: c.Duration("heartbeat-interval"),
|
HeartbeatInterval: c.Duration("heartbeat-interval"),
|
||||||
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
|
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
|
||||||
MaxHeartbeats: uint64(c.Int("heartbeat-count")),
|
MaxHeartbeats: uint64(c.Int("heartbeat-count")),
|
||||||
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
|
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
|
||||||
CompressionSetting: h2mux.CompressionSetting(uint64(c.Int("compression-quality"))),
|
CompressionSetting: h2mux.CompressionSetting(uint64(c.Int("compression-quality"))),
|
||||||
MetricsUpdateFreq: c.Duration("metrics-update-freq"),
|
MetricsUpdateFreq: c.Duration("metrics-update-freq"),
|
||||||
|
@ -272,6 +272,7 @@ func prepareTunnelConfig(
|
||||||
LBPool: c.String("lb-pool"),
|
LBPool: c.String("lb-pool"),
|
||||||
Tags: tags,
|
Tags: tags,
|
||||||
Log: log,
|
Log: log,
|
||||||
|
LogTransport: logTransport,
|
||||||
Observer: observer,
|
Observer: observer,
|
||||||
ReportedVersion: version,
|
ReportedVersion: version,
|
||||||
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
|
// Note TUN-3758 , we use Int because UInt is not supported with altsrc
|
||||||
|
|
|
@ -27,7 +27,7 @@ var (
|
||||||
Scheme: "https",
|
Scheme: "https",
|
||||||
Host: "connectiontest.argotunnel.com",
|
Host: "connectiontest.argotunnel.com",
|
||||||
}
|
}
|
||||||
testObserver = NewObserver(&log, false)
|
testObserver = NewObserver(&log, &log, false)
|
||||||
testLargeResp = make([]byte, largeFileSize)
|
testLargeResp = make([]byte, largeFileSize)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ func NewH2muxConnection(
|
||||||
|
|
||||||
// Establish a muxed connection with the edge
|
// Establish a muxed connection with the edge
|
||||||
// Client mux handshake with agent server
|
// Client mux handshake with agent server
|
||||||
muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig.H2MuxerConfig(h, observer.log), h2mux.ActiveStreams)
|
muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig.H2MuxerConfig(h, observer.logTransport), h2mux.ActiveStreams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recoverable := isHandshakeErrRecoverable(err, connIndex, observer)
|
recoverable := isHandshakeErrRecoverable(err, connIndex, observer)
|
||||||
return nil, err, recoverable
|
return nil, err, recoverable
|
||||||
|
|
|
@ -17,6 +17,7 @@ const (
|
||||||
|
|
||||||
type Observer struct {
|
type Observer struct {
|
||||||
log *zerolog.Logger
|
log *zerolog.Logger
|
||||||
|
logTransport *zerolog.Logger
|
||||||
metrics *tunnelMetrics
|
metrics *tunnelMetrics
|
||||||
tunnelEventChan chan Event
|
tunnelEventChan chan Event
|
||||||
uiEnabled bool
|
uiEnabled bool
|
||||||
|
@ -27,9 +28,10 @@ type EventSink interface {
|
||||||
OnTunnelEvent(event Event)
|
OnTunnelEvent(event Event)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObserver(log *zerolog.Logger, uiEnabled bool) *Observer {
|
func NewObserver(log, logTransport *zerolog.Logger, uiEnabled bool) *Observer {
|
||||||
o := &Observer{
|
o := &Observer{
|
||||||
log: log,
|
log: log,
|
||||||
|
logTransport: logTransport,
|
||||||
metrics: newTunnelMetrics(),
|
metrics: newTunnelMetrics(),
|
||||||
uiEnabled: uiEnabled,
|
uiEnabled: uiEnabled,
|
||||||
tunnelEventChan: make(chan Event, observerChannelBufferSize),
|
tunnelEventChan: make(chan Event, observerChannelBufferSize),
|
||||||
|
|
|
@ -44,7 +44,7 @@ func TestRegisterServerLocation(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestObserverEventsDontBlock(t *testing.T) {
|
func TestObserverEventsDontBlock(t *testing.T) {
|
||||||
observer := NewObserver(&log, false)
|
observer := NewObserver(&log, &log, false)
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
observer.RegisterSink(EventSinkFunc(func(_ Event) {
|
observer.RegisterSink(EventSinkFunc(func(_ Event) {
|
||||||
// callback will block if lock is already held
|
// callback will block if lock is already held
|
||||||
|
|
|
@ -51,7 +51,8 @@ type Supervisor struct {
|
||||||
nextConnectedIndex int
|
nextConnectedIndex int
|
||||||
nextConnectedSignal chan struct{}
|
nextConnectedSignal chan struct{}
|
||||||
|
|
||||||
log *zerolog.Logger
|
log *zerolog.Logger
|
||||||
|
logTransport *zerolog.Logger
|
||||||
|
|
||||||
reconnectCredentialManager *reconnectCredentialManager
|
reconnectCredentialManager *reconnectCredentialManager
|
||||||
useReconnectToken bool
|
useReconnectToken bool
|
||||||
|
@ -94,6 +95,7 @@ func NewSupervisor(config *TunnelConfig, reconnectCh chan ReconnectSignal, grace
|
||||||
tunnelErrors: make(chan tunnelError),
|
tunnelErrors: make(chan tunnelError),
|
||||||
tunnelsConnecting: map[int]chan struct{}{},
|
tunnelsConnecting: map[int]chan struct{}{},
|
||||||
log: config.Log,
|
log: config.Log,
|
||||||
|
logTransport: config.LogTransport,
|
||||||
reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections),
|
reconnectCredentialManager: newReconnectCredentialManager(connection.MetricsNamespace, connection.TunnelSubsystem, config.HAConnections),
|
||||||
useReconnectToken: useReconnectToken,
|
useReconnectToken: useReconnectToken,
|
||||||
reconnectCh: reconnectCh,
|
reconnectCh: reconnectCh,
|
||||||
|
@ -350,7 +352,7 @@ func (s *Supervisor) authenticate(ctx context.Context, numPreviousAttempts int)
|
||||||
// This callback is invoked by h2mux when the edge initiates a stream.
|
// This callback is invoked by h2mux when the edge initiates a stream.
|
||||||
return nil // noop
|
return nil // noop
|
||||||
})
|
})
|
||||||
muxerConfig := s.config.MuxerConfig.H2MuxerConfig(handler, s.log)
|
muxerConfig := s.config.MuxerConfig.H2MuxerConfig(handler, s.logTransport)
|
||||||
muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig, h2mux.ActiveStreams)
|
muxer, err := h2mux.Handshake(edgeConn, edgeConn, *muxerConfig, h2mux.ActiveStreams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -51,6 +51,7 @@ type TunnelConfig struct {
|
||||||
LBPool string
|
LBPool string
|
||||||
Tags []tunnelpogs.Tag
|
Tags []tunnelpogs.Tag
|
||||||
Log *zerolog.Logger
|
Log *zerolog.Logger
|
||||||
|
LogTransport *zerolog.Logger
|
||||||
Observer *connection.Observer
|
Observer *connection.Observer
|
||||||
ReportedVersion string
|
ReportedVersion string
|
||||||
Retries uint
|
Retries uint
|
||||||
|
|
|
@ -52,8 +52,9 @@ func TestWaitForBackoffFallback(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
config := &TunnelConfig{
|
config := &TunnelConfig{
|
||||||
Log: &log,
|
Log: &log,
|
||||||
|
LogTransport: &log,
|
||||||
ProtocolSelector: protocolSelector,
|
ProtocolSelector: protocolSelector,
|
||||||
Observer: connection.NewObserver(nil, false),
|
Observer: connection.NewObserver(&log, &log, false),
|
||||||
}
|
}
|
||||||
connIndex := uint8(1)
|
connIndex := uint8(1)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue