cloudflared-mirror/flow/limiter.go

78 lines
1.6 KiB
Go

package flow
import (
"errors"
"sync"
)
const (
unlimitedActiveFlows = 0
)
var (
ErrTooManyActiveFlows = errors.New("too many active flows")
)
type Limiter interface {
// Acquire tries to acquire a free slot for a flow, if the value of flows is already above
// the maximum it returns ErrTooManyActiveFlows.
Acquire(flowType string) error
// Release releases a slot for a flow.
Release()
// SetLimit allows to hot swap the limit value of the limiter.
SetLimit(uint64)
}
type flowLimiter struct {
limiterLock sync.Mutex
activeFlowsCounter uint64
maxActiveFlows uint64
unlimited bool
}
func NewLimiter(maxActiveFlows uint64) Limiter {
flowLimiter := &flowLimiter{
maxActiveFlows: maxActiveFlows,
unlimited: isUnlimited(maxActiveFlows),
}
return flowLimiter
}
func (s *flowLimiter) Acquire(flowType string) error {
s.limiterLock.Lock()
defer s.limiterLock.Unlock()
if !s.unlimited && s.activeFlowsCounter >= s.maxActiveFlows {
flowRegistrationsDropped.WithLabelValues(flowType).Inc()
return ErrTooManyActiveFlows
}
s.activeFlowsCounter++
return nil
}
func (s *flowLimiter) Release() {
s.limiterLock.Lock()
defer s.limiterLock.Unlock()
if s.activeFlowsCounter <= 0 {
return
}
s.activeFlowsCounter--
}
func (s *flowLimiter) SetLimit(newMaxActiveFlows uint64) {
s.limiterLock.Lock()
defer s.limiterLock.Unlock()
s.maxActiveFlows = newMaxActiveFlows
s.unlimited = isUnlimited(newMaxActiveFlows)
}
// isUnlimited checks if the value received matches the configuration for the unlimited flow limiter.
func isUnlimited(value uint64) bool {
return value == unlimitedActiveFlows
}