From 02705c44b2e2de0b22ba750539ecd3880160fe2d Mon Sep 17 00:00:00 2001 From: Devin Carr Date: Tue, 13 May 2025 16:11:09 +0000 Subject: [PATCH] TUN-9322: Add metric for unsupported RPC commands for datagram v3 Additionally adds support for the connection index as a label for the datagram v3 specific tunnel metrics. Closes TUN-9322 --- connection/quic_datagram_v3.go | 19 ++++++-- datagramsession/session.go | 2 +- datagramsession/session_test.go | 21 +++++---- quic/metrics.go | 57 +++++++++++++----------- quic/v3/metrics.go | 79 ++++++++++++++++++++------------- quic/v3/metrics_test.go | 11 ++--- quic/v3/muxer.go | 6 +-- quic/v3/muxer_test.go | 4 +- quic/v3/session.go | 14 +++--- quic/v3/session_test.go | 16 +++---- 10 files changed, 133 insertions(+), 96 deletions(-) diff --git a/connection/quic_datagram_v3.go b/connection/quic_datagram_v3.go index 1b42600e..c41f8977 100644 --- a/connection/quic_datagram_v3.go +++ b/connection/quic_datagram_v3.go @@ -2,11 +2,11 @@ package connection import ( "context" - "fmt" "net" "time" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/quic-go/quic-go" "github.com/rs/zerolog" @@ -16,10 +16,17 @@ import ( "github.com/cloudflare/cloudflared/tunnelrpc/pogs" ) +var ( + ErrUnsupportedRPCUDPRegistration = errors.New("datagram v3 does not support RegisterUdpSession RPC") + ErrUnsupportedRPCUDPUnregistration = errors.New("datagram v3 does not support UnregisterUdpSession RPC") +) + type datagramV3Connection struct { - conn quic.Connection + conn quic.Connection + index uint8 // datagramMuxer mux/demux datagrams from quic connection datagramMuxer cfdquic.DatagramConn + metrics cfdquic.Metrics logger *zerolog.Logger } @@ -40,7 +47,9 @@ func NewDatagramV3Connection(ctx context.Context, return &datagramV3Connection{ conn, + index, datagramMuxer, + metrics, logger, } } @@ -50,9 +59,11 @@ func (d *datagramV3Connection) Serve(ctx context.Context) error { } func (d *datagramV3Connection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*pogs.RegisterUdpSessionResponse, error) { - return nil, fmt.Errorf("datagram v3 does not support RegisterUdpSession RPC") + d.metrics.UnsupportedRemoteCommand(d.index, "register_udp_session") + return nil, ErrUnsupportedRPCUDPRegistration } func (d *datagramV3Connection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error { - return fmt.Errorf("datagram v3 does not support UnregisterUdpSession RPC") + d.metrics.UnsupportedRemoteCommand(d.index, "unregister_udp_session") + return ErrUnsupportedRPCUDPUnregistration } diff --git a/datagramsession/session.go b/datagramsession/session.go index bb1369f9..f3b5802f 100644 --- a/datagramsession/session.go +++ b/datagramsession/session.go @@ -84,7 +84,7 @@ func (s *Session) waitForCloseCondition(ctx context.Context, closeAfterIdle time // Closing dstConn cancels read so dstToTransport routine in Serve() can return defer s.dstConn.Close() if closeAfterIdle == 0 { - // provide deafult is caller doesn't specify one + // provide default is caller doesn't specify one closeAfterIdle = defaultCloseIdleAfter } diff --git a/datagramsession/session_test.go b/datagramsession/session_test.go index 997624b7..b8d5308b 100644 --- a/datagramsession/session_test.go +++ b/datagramsession/session_test.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -54,22 +55,22 @@ func testSessionReturns(t *testing.T, closeBy closeMethod, closeAfterIdle time.D closedByRemote, err := session.Serve(ctx, closeAfterIdle) switch closeBy { case closeByContext: - require.Equal(t, context.Canceled, err) - require.False(t, closedByRemote) + assert.Equal(t, context.Canceled, err) + assert.False(t, closedByRemote) case closeByCallingClose: - require.Equal(t, localCloseReason, err) - require.Equal(t, localCloseReason.byRemote, closedByRemote) + assert.Equal(t, localCloseReason, err) + assert.Equal(t, localCloseReason.byRemote, closedByRemote) case closeByTimeout: - require.Equal(t, SessionIdleErr(closeAfterIdle), err) - require.False(t, closedByRemote) + assert.Equal(t, SessionIdleErr(closeAfterIdle), err) + assert.False(t, closedByRemote) } close(sessionDone) }() go func() { n, err := session.transportToDst(payload) - require.NoError(t, err) - require.Equal(t, len(payload), n) + assert.NoError(t, err) + assert.Equal(t, len(payload), n) }() readBuffer := make([]byte, len(payload)+1) @@ -84,6 +85,8 @@ func testSessionReturns(t *testing.T, closeBy closeMethod, closeAfterIdle time.D cancel() case closeByCallingClose: session.close(localCloseReason) + default: + // ignore } <-sessionDone @@ -128,7 +131,7 @@ func testActiveSessionNotClosed(t *testing.T, readFromDst bool, writeToDst bool) ctx, cancel := context.WithCancel(context.Background()) errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { - session.Serve(ctx, closeAfterIdle) + _, _ = session.Serve(ctx, closeAfterIdle) if time.Now().Before(startTime.Add(activeTime)) { return fmt.Errorf("session closed while it's still active") } diff --git a/quic/metrics.go b/quic/metrics.go index 0744f275..fb6f3e08 100644 --- a/quic/metrics.go +++ b/quic/metrics.go @@ -11,12 +11,15 @@ import ( ) const ( - namespace = "quic" + namespace = "quic" + ConnectionIndexMetricLabel = "conn_index" + frameTypeMetricLabel = "frame_type" + packetTypeMetricLabel = "packet_type" + reasonMetricLabel = "reason" ) var ( - clientConnLabels = []string{"conn_index"} - clientMetrics = struct { + clientMetrics = struct { totalConnections prometheus.Counter closedConnections prometheus.Counter maxUDPPayloadSize *prometheus.GaugeVec @@ -35,7 +38,7 @@ var ( congestionState *prometheus.GaugeVec }{ totalConnections: prometheus.NewCounter( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "total_connections", @@ -43,7 +46,7 @@ var ( }, ), closedConnections: prometheus.NewCounter( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "closed_connections", @@ -57,70 +60,70 @@ var ( Name: "max_udp_payload", Help: "Maximum UDP payload size in bytes for a QUIC packet", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), sentFrames: prometheus.NewCounterVec( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "sent_frames", Help: "Number of frames that have been sent through a connection", }, - append(clientConnLabels, "frame_type"), + []string{ConnectionIndexMetricLabel, frameTypeMetricLabel}, ), sentBytes: prometheus.NewCounterVec( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "sent_bytes", Help: "Number of bytes that have been sent through a connection", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), receivedFrames: prometheus.NewCounterVec( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "received_frames", Help: "Number of frames that have been received through a connection", }, - append(clientConnLabels, "frame_type"), + []string{ConnectionIndexMetricLabel, frameTypeMetricLabel}, ), receivedBytes: prometheus.NewCounterVec( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "receive_bytes", Help: "Number of bytes that have been received through a connection", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), bufferedPackets: prometheus.NewCounterVec( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "buffered_packets", Help: "Number of bytes that have been buffered on a connection", }, - append(clientConnLabels, "packet_type"), + []string{ConnectionIndexMetricLabel, packetTypeMetricLabel}, ), droppedPackets: prometheus.NewCounterVec( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "dropped_packets", Help: "Number of bytes that have been dropped on a connection", }, - append(clientConnLabels, "packet_type", "reason"), + []string{ConnectionIndexMetricLabel, packetTypeMetricLabel, reasonMetricLabel}, ), lostPackets: prometheus.NewCounterVec( - prometheus.CounterOpts{ + prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "lost_packets", Help: "Number of packets that have been lost from a connection", }, - append(clientConnLabels, "reason"), + []string{ConnectionIndexMetricLabel, reasonMetricLabel}, ), minRTT: prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -129,7 +132,7 @@ var ( Name: "min_rtt", Help: "Lowest RTT measured on a connection in millisec", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), latestRTT: prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -138,7 +141,7 @@ var ( Name: "latest_rtt", Help: "Latest RTT measured on a connection", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), smoothedRTT: prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -147,7 +150,7 @@ var ( Name: "smoothed_rtt", Help: "Calculated smoothed RTT measured on a connection in millisec", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), mtu: prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -156,7 +159,7 @@ var ( Name: "mtu", Help: "Current maximum transmission unit (MTU) of a connection", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), congestionWindow: prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -165,7 +168,7 @@ var ( Name: "congestion_window", Help: "Current congestion window size", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), congestionState: prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -174,13 +177,13 @@ var ( Name: "congestion_state", Help: "Current congestion control state. See https://pkg.go.dev/github.com/quic-go/quic-go@v0.45.0/logging#CongestionState for what each value maps to", }, - clientConnLabels, + []string{ConnectionIndexMetricLabel}, ), } registerClient = sync.Once{} - packetTooBigDropped = prometheus.NewCounter(prometheus.CounterOpts{ + packetTooBigDropped = prometheus.NewCounter(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: "client", Name: "packet_too_big_dropped", diff --git a/quic/v3/metrics.go b/quic/v3/metrics.go index 8f9cf19e..8f330af8 100644 --- a/quic/v3/metrics.go +++ b/quic/v3/metrics.go @@ -2,82 +2,98 @@ package v3 import ( "github.com/prometheus/client_golang/prometheus" + + "github.com/cloudflare/cloudflared/quic" ) const ( namespace = "cloudflared" subsystem = "udp" + + commandMetricLabel = "command" ) type Metrics interface { - IncrementFlows() - DecrementFlows() - PayloadTooLarge() - RetryFlowResponse() - MigrateFlow() + IncrementFlows(connIndex uint8) + DecrementFlows(connIndex uint8) + PayloadTooLarge(connIndex uint8) + RetryFlowResponse(connIndex uint8) + MigrateFlow(connIndex uint8) + UnsupportedRemoteCommand(connIndex uint8, command string) } type metrics struct { - activeUDPFlows prometheus.Gauge - totalUDPFlows prometheus.Counter - payloadTooLarge prometheus.Counter - retryFlowResponses prometheus.Counter - migratedFlows prometheus.Counter + activeUDPFlows *prometheus.GaugeVec + totalUDPFlows *prometheus.CounterVec + payloadTooLarge *prometheus.CounterVec + retryFlowResponses *prometheus.CounterVec + migratedFlows *prometheus.CounterVec + unsupportedRemoteCommands *prometheus.CounterVec } -func (m *metrics) IncrementFlows() { - m.totalUDPFlows.Inc() - m.activeUDPFlows.Inc() +func (m *metrics) IncrementFlows(connIndex uint8) { + m.totalUDPFlows.WithLabelValues(string(connIndex)).Inc() + m.activeUDPFlows.WithLabelValues(string(connIndex)).Inc() } -func (m *metrics) DecrementFlows() { - m.activeUDPFlows.Dec() +func (m *metrics) DecrementFlows(connIndex uint8) { + m.activeUDPFlows.WithLabelValues(string(connIndex)).Dec() } -func (m *metrics) PayloadTooLarge() { - m.payloadTooLarge.Inc() +func (m *metrics) PayloadTooLarge(connIndex uint8) { + m.payloadTooLarge.WithLabelValues(string(connIndex)).Inc() } -func (m *metrics) RetryFlowResponse() { - m.retryFlowResponses.Inc() +func (m *metrics) RetryFlowResponse(connIndex uint8) { + m.retryFlowResponses.WithLabelValues(string(connIndex)).Inc() } -func (m *metrics) MigrateFlow() { - m.migratedFlows.Inc() +func (m *metrics) MigrateFlow(connIndex uint8) { + m.migratedFlows.WithLabelValues(string(connIndex)).Inc() +} + +func (m *metrics) UnsupportedRemoteCommand(connIndex uint8, command string) { + m.unsupportedRemoteCommands.WithLabelValues(string(connIndex), command).Inc() } func NewMetrics(registerer prometheus.Registerer) Metrics { m := &metrics{ - activeUDPFlows: prometheus.NewGauge(prometheus.GaugeOpts{ + activeUDPFlows: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "active_flows", Help: "Concurrent count of UDP flows that are being proxied to any origin", - }), - totalUDPFlows: prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{quic.ConnectionIndexMetricLabel}), + totalUDPFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: subsystem, Name: "total_flows", Help: "Total count of UDP flows that have been proxied to any origin", - }), - payloadTooLarge: prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{quic.ConnectionIndexMetricLabel}), + payloadTooLarge: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: subsystem, Name: "payload_too_large", Help: "Total count of UDP flows that have had origin payloads that are too large to proxy", - }), - retryFlowResponses: prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{quic.ConnectionIndexMetricLabel}), + retryFlowResponses: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: subsystem, Name: "retry_flow_responses", Help: "Total count of UDP flows that have had to send their registration response more than once", - }), - migratedFlows: prometheus.NewCounter(prometheus.CounterOpts{ + }, []string{quic.ConnectionIndexMetricLabel}), + migratedFlows: prometheus.NewCounterVec(prometheus.CounterOpts{ //nolint:promlinter Namespace: namespace, Subsystem: subsystem, Name: "migrated_flows", Help: "Total count of UDP flows have been migrated across local connections", - }), + }, []string{quic.ConnectionIndexMetricLabel}), + unsupportedRemoteCommands: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "unsupported_remote_command_total", + Help: "Total count of unsupported remote RPC commands for the ", + }, []string{quic.ConnectionIndexMetricLabel, commandMetricLabel}), } registerer.MustRegister( m.activeUDPFlows, @@ -85,6 +101,7 @@ func NewMetrics(registerer prometheus.Registerer) Metrics { m.payloadTooLarge, m.retryFlowResponses, m.migratedFlows, + m.unsupportedRemoteCommands, ) return m } diff --git a/quic/v3/metrics_test.go b/quic/v3/metrics_test.go index 5f6b18a7..80e815e5 100644 --- a/quic/v3/metrics_test.go +++ b/quic/v3/metrics_test.go @@ -2,8 +2,9 @@ package v3_test type noopMetrics struct{} -func (noopMetrics) IncrementFlows() {} -func (noopMetrics) DecrementFlows() {} -func (noopMetrics) PayloadTooLarge() {} -func (noopMetrics) RetryFlowResponse() {} -func (noopMetrics) MigrateFlow() {} +func (noopMetrics) IncrementFlows(connIndex uint8) {} +func (noopMetrics) DecrementFlows(connIndex uint8) {} +func (noopMetrics) PayloadTooLarge(connIndex uint8) {} +func (noopMetrics) RetryFlowResponse(connIndex uint8) {} +func (noopMetrics) MigrateFlow(connIndex uint8) {} +func (noopMetrics) UnsupportedRemoteCommand(connIndex uint8, command string) {} diff --git a/quic/v3/muxer.go b/quic/v3/muxer.go index 09a95e56..4f5605bf 100644 --- a/quic/v3/muxer.go +++ b/quic/v3/muxer.go @@ -264,10 +264,10 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da return } log = log.With().Str(logSrcKey, session.LocalAddr().String()).Logger() - c.metrics.IncrementFlows() + c.metrics.IncrementFlows(c.index) // Make sure to eventually remove the session from the session manager when the session is closed defer c.sessionManager.UnregisterSession(session.ID()) - defer c.metrics.DecrementFlows() + defer c.metrics.DecrementFlows(c.index) // Respond that we are able to process the new session err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk) @@ -315,7 +315,7 @@ func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logge // The session is already running in another routine so we want to restart the idle timeout since no proxied // packets have come down yet. session.ResetIdleTimer() - c.metrics.RetryFlowResponse() + c.metrics.RetryFlowResponse(c.index) logger.Debug().Msgf("flow registration response retry") } diff --git a/quic/v3/muxer_test.go b/quic/v3/muxer_test.go index 1e17c215..9fd09ca0 100644 --- a/quic/v3/muxer_test.go +++ b/quic/v3/muxer_test.go @@ -781,12 +781,12 @@ func newICMPDatagram(pk *packet.ICMP) []byte { // Cancel the provided context and make sure it closes with the expected cancellation error func assertContextClosed(t *testing.T, ctx context.Context, done <-chan error, cancel context.CancelCauseFunc) { - cancel(expectedContextCanceled) + cancel(errExpectedContextCanceled) err := <-done if !errors.Is(err, context.Canceled) { t.Fatal(err) } - if !errors.Is(context.Cause(ctx), expectedContextCanceled) { + if !errors.Is(context.Cause(ctx), errExpectedContextCanceled) { t.Fatal(err) } } diff --git a/quic/v3/session.go b/quic/v3/session.go index 6836aed9..82f71b0d 100644 --- a/quic/v3/session.go +++ b/quic/v3/session.go @@ -27,11 +27,11 @@ const ( ) // SessionCloseErr indicates that the session's Close method was called. -var SessionCloseErr error = errors.New("flow was closed directly") +var SessionCloseErr error = errors.New("flow was closed directly") //nolint:errname // SessionIdleErr is returned when the session was closed because there was no communication // in either direction over the session for the timeout period. -type SessionIdleErr struct { +type SessionIdleErr struct { //nolint:errname timeout time.Duration } @@ -149,7 +149,8 @@ func (s *session) Migrate(eyeball DatagramConn, ctx context.Context, logger *zer } // The session is already running so we want to restart the idle timeout since no proxied packets have come down yet. s.markActive() - s.metrics.MigrateFlow() + connectionIndex := eyeball.ID() + s.metrics.MigrateFlow(connectionIndex) } func (s *session) Serve(ctx context.Context) error { @@ -160,7 +161,7 @@ func (s *session) Serve(ctx context.Context) error { // To perform a zero copy write when passing the datagram to the connection, we prepare the buffer with // the required datagram header information. We can reuse this buffer for this session since the header is the // same for the each read. - MarshalPayloadHeaderTo(s.id, readBuffer[:DatagramPayloadHeaderLen]) + _ = MarshalPayloadHeaderTo(s.id, readBuffer[:DatagramPayloadHeaderLen]) for { // Read from the origin UDP socket n, err := s.origin.Read(readBuffer[DatagramPayloadHeaderLen:]) @@ -177,7 +178,8 @@ func (s *session) Serve(ctx context.Context) error { continue } if n > maxDatagramPayloadLen { - s.metrics.PayloadTooLarge() + connectionIndex := s.ConnectionID() + s.metrics.PayloadTooLarge(connectionIndex) s.log.Error().Int(logPacketSizeKey, n).Msg("flow (origin) packet read was too large and was dropped") continue } @@ -241,7 +243,7 @@ func (s *session) waitForCloseCondition(ctx context.Context, closeAfterIdle time // Closing the session at the end cancels read so Serve() can return defer s.Close() if closeAfterIdle == 0 { - // provide deafult is caller doesn't specify one + // Provided that the default caller doesn't specify one closeAfterIdle = defaultCloseIdleAfter } diff --git a/quic/v3/session_test.go b/quic/v3/session_test.go index b739ca2d..a43a811f 100644 --- a/quic/v3/session_test.go +++ b/quic/v3/session_test.go @@ -17,7 +17,7 @@ import ( ) var ( - expectedContextCanceled = errors.New("expected context canceled") + errExpectedContextCanceled = errors.New("expected context canceled") testOriginAddr = net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:0")) testLocalAddr = net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:0")) @@ -40,7 +40,7 @@ func testSessionWrite(t *testing.T, payload []byte) { serverRead := make(chan []byte, 1) go func() { read := make([]byte, 1500) - server.Read(read[:]) + _, _ = server.Read(read[:]) serverRead <- read }() // Create session and write to origin @@ -110,12 +110,12 @@ func testSessionServe_Origin(t *testing.T, payload []byte) { case data := <-eyeball.recvData: // check received data matches provided from origin expectedData := makePayload(1500) - v3.MarshalPayloadHeaderTo(testRequestID, expectedData[:]) + _ = v3.MarshalPayloadHeaderTo(testRequestID, expectedData[:]) copy(expectedData[17:], payload) if !slices.Equal(expectedData[:v3.DatagramPayloadHeaderLen+len(payload)], data) { t.Fatal("expected datagram did not equal expected") } - cancel(expectedContextCanceled) + cancel(errExpectedContextCanceled) case err := <-ctx.Done(): // we expect the payload to return before the context to cancel on the session t.Fatal(err) @@ -125,7 +125,7 @@ func testSessionServe_Origin(t *testing.T, payload []byte) { if !errors.Is(err, context.Canceled) { t.Fatal(err) } - if !errors.Is(context.Cause(ctx), expectedContextCanceled) { + if !errors.Is(context.Cause(ctx), errExpectedContextCanceled) { t.Fatal(err) } } @@ -198,7 +198,7 @@ func TestSessionServe_Migrate(t *testing.T) { // Origin sends data payload2 := []byte{0xde} - pipe1.Write(payload2) + _, _ = pipe1.Write(payload2) // Expect write to eyeball2 data := <-eyeball2.recvData @@ -249,13 +249,13 @@ func TestSessionServe_Migrate_CloseContext2(t *testing.T) { t.Fatalf("expected session to still be running") default: } - if context.Cause(eyeball1Ctx) != contextCancelErr { + if !errors.Is(context.Cause(eyeball1Ctx), contextCancelErr) { t.Fatalf("first eyeball context should be cancelled manually: %+v", context.Cause(eyeball1Ctx)) } // Origin sends data payload2 := []byte{0xde} - pipe1.Write(payload2) + _, _ = pipe1.Write(payload2) // Expect write to eyeball2 data := <-eyeball2.recvData