From c0f85ab85b7def0d61ccff4fa5c2a126345abecd Mon Sep 17 00:00:00 2001 From: cthuang Date: Mon, 28 Mar 2022 10:06:28 +0100 Subject: [PATCH] TUN-5956: Add timeout to session manager APIs --- datagramsession/manager.go | 13 ++++++++++++- datagramsession/manager_test.go | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/datagramsession/manager.go b/datagramsession/manager.go index caf9d4fd..c4144279 100644 --- a/datagramsession/manager.go +++ b/datagramsession/manager.go @@ -3,6 +3,7 @@ package datagramsession import ( "context" "io" + "time" "github.com/google/uuid" "github.com/lucas-clemente/quic-go" @@ -12,6 +13,7 @@ import ( const ( requestChanCapacity = 16 + defaultReqTimeout = time.Second * 5 ) // Manager defines the APIs to manage sessions from the same transport. @@ -31,9 +33,11 @@ type manager struct { transport transport sessions map[uuid.UUID]*Session log *zerolog.Logger + // timeout waiting for an API to finish. This can be overriden in test + timeout time.Duration } -func NewManager(transport transport, log *zerolog.Logger) Manager { +func NewManager(transport transport, log *zerolog.Logger) *manager { return &manager{ registrationChan: make(chan *registerSessionEvent), unregistrationChan: make(chan *unregisterSessionEvent), @@ -42,6 +46,7 @@ func NewManager(transport transport, log *zerolog.Logger) Manager { transport: transport, sessions: make(map[uuid.UUID]*Session), log: log, + timeout: defaultReqTimeout, } } @@ -89,9 +94,12 @@ func (m *manager) Serve(ctx context.Context) error { } func (m *manager) RegisterSession(ctx context.Context, sessionID uuid.UUID, originProxy io.ReadWriteCloser) (*Session, error) { + ctx, cancel := context.WithTimeout(ctx, m.timeout) + defer cancel() event := newRegisterSessionEvent(sessionID, originProxy) select { case <-ctx.Done(): + m.log.Error().Msg("Datagram session registration timeout") return nil, ctx.Err() case m.registrationChan <- event: session := <-event.resultChan @@ -106,6 +114,8 @@ func (m *manager) registerSession(ctx context.Context, registration *registerSes } func (m *manager) UnregisterSession(ctx context.Context, sessionID uuid.UUID, message string, byRemote bool) error { + ctx, cancel := context.WithTimeout(ctx, m.timeout) + defer cancel() event := &unregisterSessionEvent{ sessionID: sessionID, err: &errClosedSession{ @@ -115,6 +125,7 @@ func (m *manager) UnregisterSession(ctx context.Context, sessionID uuid.UUID, me } select { case <-ctx.Done(): + m.log.Error().Msg("Datagram session unregistration timeout") return ctx.Err() case m.unregistrationChan <- event: return nil diff --git a/datagramsession/manager_test.go b/datagramsession/manager_test.go index e81147df..823c55bb 100644 --- a/datagramsession/manager_test.go +++ b/datagramsession/manager_test.go @@ -120,6 +120,28 @@ func TestManagerServe(t *testing.T) { <-serveDone } +func TestTimeout(t *testing.T) { + const ( + testTimeout = time.Millisecond * 50 + ) + log := zerolog.Nop() + transport := &mockQUICTransport{ + reqChan: newDatagramChannel(1), + respChan: newDatagramChannel(1), + } + mg := NewManager(transport, &log) + mg.timeout = testTimeout + ctx := context.Background() + sessionID := uuid.New() + // session manager is not running, so event loop is not running and therefore calling the APIs should timeout + session, err := mg.RegisterSession(ctx, sessionID, nil) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, session) + + err = mg.UnregisterSession(ctx, sessionID, "session gone", true) + require.ErrorIs(t, err, context.DeadlineExceeded) +} + type mockOrigin struct { expectMsgCount int expectedMsg []byte