cloudflared-mirror/h2mux/readylist.go

152 lines
3.4 KiB
Go
Raw Permalink Normal View History

package h2mux
import "sync"
// ReadyList multiplexes several event signals onto a single channel.
type ReadyList struct {
// signalC is used to signal that a stream can be enqueued
signalC chan uint32
// waitC is used to signal the ID of the first ready descriptor
waitC chan uint32
// doneC is used to signal that run should terminate
doneC chan struct{}
closeOnce sync.Once
}
func NewReadyList() *ReadyList {
rl := &ReadyList{
signalC: make(chan uint32),
waitC: make(chan uint32),
doneC: make(chan struct{}),
}
go rl.run()
return rl
}
// ID is the stream ID
func (r *ReadyList) Signal(ID uint32) {
select {
case r.signalC <- ID:
// ReadyList already closed
case <-r.doneC:
}
}
func (r *ReadyList) ReadyChannel() <-chan uint32 {
return r.waitC
}
func (r *ReadyList) Close() {
r.closeOnce.Do(func() {
close(r.doneC)
})
}
func (r *ReadyList) run() {
defer close(r.waitC)
var queue readyDescriptorQueue
var firstReady *readyDescriptor
activeDescriptors := newReadyDescriptorMap()
for {
if firstReady == nil {
select {
case i := <-r.signalC:
firstReady = activeDescriptors.SetIfMissing(i)
case <-r.doneC:
return
}
}
select {
case r.waitC <- firstReady.ID:
activeDescriptors.Delete(firstReady.ID)
firstReady = queue.Dequeue()
case i := <-r.signalC:
newReady := activeDescriptors.SetIfMissing(i)
if newReady != nil {
// key doesn't exist
queue.Enqueue(newReady)
}
case <-r.doneC:
return
}
}
}
type readyDescriptor struct {
ID uint32
Next *readyDescriptor
}
// readyDescriptorQueue is a queue of readyDescriptors in the form of a singly-linked list.
// The nil readyDescriptorQueue is an empty queue ready for use.
type readyDescriptorQueue struct {
Head *readyDescriptor
Tail *readyDescriptor
}
func (q *readyDescriptorQueue) Empty() bool {
return q.Head == nil
}
func (q *readyDescriptorQueue) Enqueue(x *readyDescriptor) {
if x.Next != nil {
panic("enqueued already queued item")
}
if q.Empty() {
q.Head = x
q.Tail = x
} else {
q.Tail.Next = x
q.Tail = x
}
}
// Dequeue returns the first readyDescriptor in the queue, or nil if empty.
func (q *readyDescriptorQueue) Dequeue() *readyDescriptor {
if q.Empty() {
return nil
}
x := q.Head
q.Head = x.Next
x.Next = nil
return x
}
// readyDescriptorQueue is a map of readyDescriptors keyed by ID.
// It maintains a free list of deleted ready descriptors.
type readyDescriptorMap struct {
descriptors map[uint32]*readyDescriptor
free []*readyDescriptor
}
func newReadyDescriptorMap() *readyDescriptorMap {
return &readyDescriptorMap{descriptors: make(map[uint32]*readyDescriptor)}
}
// create or reuse a readyDescriptor if the stream is not in the queue.
// This avoid stream starvation caused by a single high-bandwidth stream monopolising the writer goroutine
func (m *readyDescriptorMap) SetIfMissing(key uint32) *readyDescriptor {
if _, ok := m.descriptors[key]; ok {
return nil
}
var newDescriptor *readyDescriptor
if len(m.free) > 0 {
// reuse deleted ready descriptors
newDescriptor = m.free[len(m.free)-1]
m.free = m.free[:len(m.free)-1]
} else {
newDescriptor = &readyDescriptor{}
}
newDescriptor.ID = key
m.descriptors[key] = newDescriptor
return newDescriptor
}
func (m *readyDescriptorMap) Delete(key uint32) {
if descriptor, ok := m.descriptors[key]; ok {
m.free = append(m.free, descriptor)
delete(m.descriptors, key)
}
}