diff --git a/management/service.go b/management/service.go index 0514178a..daf5687f 100644 --- a/management/service.go +++ b/management/service.go @@ -5,6 +5,7 @@ import ( "net/http" "sync" "sync/atomic" + "time" "github.com/go-chi/chi/v5" "github.com/rs/zerolog" @@ -19,6 +20,9 @@ const ( // value will return this error to incoming requests. StatusSessionLimitExceeded websocket.StatusCode = 4002 reasonSessionLimitExceeded = "limit exceeded for streaming sessions" + + StatusIdleLimitExceeded websocket.StatusCode = 4003 + reasonIdleLimitExceeded = "session was idle for too long" ) type ManagementService struct { @@ -147,10 +151,20 @@ func (m *ManagementService) logs(w http.ResponseWriter, r *http.Request) { } // Make sure the connection is closed if other go routines fail to close the connection after completing. defer c.Close(websocket.StatusInternalError, "") - ctx := r.Context() + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() events := make(chan *ClientEvent) go m.readEvents(c, ctx, events) + // Send a heartbeat ping to hold the connection open even if not streaming. + ping := time.NewTicker(15 * time.Second) + defer ping.Stop() + + // Close the connection if no operation has occurred after the idle timeout. + idleTimeout := 5 * time.Minute + idle := time.NewTimer(idleTimeout) + defer idle.Stop() + for { select { case <-ctx.Done(): @@ -160,9 +174,11 @@ func (m *ManagementService) logs(w http.ResponseWriter, r *http.Request) { case event := <-events: switch event.Type { case StartStreaming: + idle.Stop() m.startStreaming(c, ctx, event) continue case StopStreaming: + idle.Reset(idleTimeout) // TODO: limit StopStreaming to only halt streaming for clients that are already streaming m.streaming.Store(false) case UnknownClientEventType: @@ -176,6 +192,11 @@ func (m *ManagementService) logs(w http.ResponseWriter, r *http.Request) { } return } + case <-ping.C: + go c.Ping(ctx) + case <-idle.C: + c.Close(StatusIdleLimitExceeded, reasonIdleLimitExceeded) + return } } }