78 lines
1.6 KiB
Go
78 lines
1.6 KiB
Go
|
package flow
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
unlimitedActiveFlows = 0
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrTooManyActiveFlows = errors.New("too many active flows")
|
||
|
)
|
||
|
|
||
|
type Limiter interface {
|
||
|
// Acquire tries to acquire a free slot for a flow, if the value of flows is already above
|
||
|
// the maximum it returns ErrTooManyActiveFlows.
|
||
|
Acquire(flowType string) error
|
||
|
// Release releases a slot for a flow.
|
||
|
Release()
|
||
|
// SetLimit allows to hot swap the limit value of the limiter.
|
||
|
SetLimit(uint64)
|
||
|
}
|
||
|
|
||
|
type flowLimiter struct {
|
||
|
limiterLock sync.Mutex
|
||
|
activeFlowsCounter uint64
|
||
|
maxActiveFlows uint64
|
||
|
unlimited bool
|
||
|
}
|
||
|
|
||
|
func NewLimiter(maxActiveFlows uint64) Limiter {
|
||
|
flowLimiter := &flowLimiter{
|
||
|
maxActiveFlows: maxActiveFlows,
|
||
|
unlimited: isUnlimited(maxActiveFlows),
|
||
|
}
|
||
|
|
||
|
return flowLimiter
|
||
|
}
|
||
|
|
||
|
func (s *flowLimiter) Acquire(flowType string) error {
|
||
|
s.limiterLock.Lock()
|
||
|
defer s.limiterLock.Unlock()
|
||
|
|
||
|
if !s.unlimited && s.activeFlowsCounter >= s.maxActiveFlows {
|
||
|
flowRegistrationsDropped.WithLabelValues(flowType).Inc()
|
||
|
return ErrTooManyActiveFlows
|
||
|
}
|
||
|
|
||
|
s.activeFlowsCounter++
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *flowLimiter) Release() {
|
||
|
s.limiterLock.Lock()
|
||
|
defer s.limiterLock.Unlock()
|
||
|
|
||
|
if s.activeFlowsCounter <= 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
s.activeFlowsCounter--
|
||
|
}
|
||
|
|
||
|
func (s *flowLimiter) SetLimit(newMaxActiveFlows uint64) {
|
||
|
s.limiterLock.Lock()
|
||
|
defer s.limiterLock.Unlock()
|
||
|
|
||
|
s.maxActiveFlows = newMaxActiveFlows
|
||
|
s.unlimited = isUnlimited(newMaxActiveFlows)
|
||
|
}
|
||
|
|
||
|
// isUnlimited checks if the value received matches the configuration for the unlimited flow limiter.
|
||
|
func isUnlimited(value uint64) bool {
|
||
|
return value == unlimitedActiveFlows
|
||
|
}
|