cloudflared-mirror/datagramsession/manager.go

140 lines
4.1 KiB
Go

package datagramsession
import (
"context"
"io"
"github.com/google/uuid"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
)
const (
requestChanCapacity = 16
)
// Manager defines the APIs to manage sessions from the same transport.
type Manager interface {
// Serve starts the event loop
Serve(ctx context.Context) error
// RegisterSession starts tracking a session. Caller is responsible for starting the session
RegisterSession(ctx context.Context, sessionID uuid.UUID, dstConn io.ReadWriteCloser) (*Session, error)
// UnregisterSession stops tracking the session and terminates it
UnregisterSession(ctx context.Context, sessionID uuid.UUID, message string, byRemote bool) error
}
type manager struct {
registrationChan chan *registerSessionEvent
unregistrationChan chan *unregisterSessionEvent
datagramChan chan *newDatagram
transport transport
sessions map[uuid.UUID]*Session
log *zerolog.Logger
}
func NewManager(transport transport, log *zerolog.Logger) Manager {
return &manager{
registrationChan: make(chan *registerSessionEvent),
unregistrationChan: make(chan *unregisterSessionEvent),
// datagramChan is buffered, so it can read more datagrams from transport while the event loop is processing other events
datagramChan: make(chan *newDatagram, requestChanCapacity),
transport: transport,
sessions: make(map[uuid.UUID]*Session),
log: log,
}
}
func (m *manager) Serve(ctx context.Context) error {
errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
for {
sessionID, payload, err := m.transport.ReceiveFrom()
if err != nil {
return err
}
datagram := &newDatagram{
sessionID: sessionID,
payload: payload,
}
select {
case <-ctx.Done():
return ctx.Err()
// Only the event loop routine can update/lookup the sessions map to avoid concurrent access
// Send the datagram to the event loop. It will find the session to send to
case m.datagramChan <- datagram:
}
}
})
errGroup.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case datagram := <-m.datagramChan:
m.sendToSession(datagram)
case registration := <-m.registrationChan:
m.registerSession(ctx, registration)
// TODO: TUN-5422: Unregister inactive session upon timeout
case unregistration := <-m.unregistrationChan:
m.unregisterSession(unregistration)
}
}
})
return errGroup.Wait()
}
func (m *manager) RegisterSession(ctx context.Context, sessionID uuid.UUID, originProxy io.ReadWriteCloser) (*Session, error) {
event := newRegisterSessionEvent(sessionID, originProxy)
select {
case <-ctx.Done():
return nil, ctx.Err()
case m.registrationChan <- event:
session := <-event.resultChan
return session, nil
}
}
func (m *manager) registerSession(ctx context.Context, registration *registerSessionEvent) {
session := newSession(registration.sessionID, m.transport, registration.originProxy, m.log)
m.sessions[registration.sessionID] = session
registration.resultChan <- session
}
func (m *manager) UnregisterSession(ctx context.Context, sessionID uuid.UUID, message string, byRemote bool) error {
event := &unregisterSessionEvent{
sessionID: sessionID,
err: &errClosedSession{
message: message,
byRemote: byRemote,
},
}
select {
case <-ctx.Done():
return ctx.Err()
case m.unregistrationChan <- event:
return nil
}
}
func (m *manager) unregisterSession(unregistration *unregisterSessionEvent) {
session, ok := m.sessions[unregistration.sessionID]
if ok {
delete(m.sessions, unregistration.sessionID)
session.close(unregistration.err)
}
}
func (m *manager) sendToSession(datagram *newDatagram) {
session, ok := m.sessions[datagram.sessionID]
if !ok {
m.log.Error().Str("sessionID", datagram.sessionID.String()).Msg("session not found")
return
}
// session writes to destination over a connected UDP socket, which should not be blocking, so this call doesn't
// need to run in another go routine
_, err := session.transportToDst(datagram.payload)
if err != nil {
m.log.Err(err).Str("sessionID", datagram.sessionID.String()).Msg("Failed to write payload to session")
}
}