package flow

import (
	"errors"
	"sync"
)

const (
	unlimitedActiveFlows = 0
)

var (
	ErrTooManyActiveFlows = errors.New("too many active flows")
)

type Limiter interface {
	// Acquire tries to acquire a free slot for a flow, if the value of flows is already above
	// the maximum it returns ErrTooManyActiveFlows.
	Acquire(flowType string) error
	// Release releases a slot for a flow.
	Release()
	// SetLimit allows to hot swap the limit value of the limiter.
	SetLimit(uint64)
}

type flowLimiter struct {
	limiterLock        sync.Mutex
	activeFlowsCounter uint64
	maxActiveFlows     uint64
	unlimited          bool
}

func NewLimiter(maxActiveFlows uint64) Limiter {
	flowLimiter := &flowLimiter{
		maxActiveFlows: maxActiveFlows,
		unlimited:      isUnlimited(maxActiveFlows),
	}

	return flowLimiter
}

func (s *flowLimiter) Acquire(flowType string) error {
	s.limiterLock.Lock()
	defer s.limiterLock.Unlock()

	if !s.unlimited && s.activeFlowsCounter >= s.maxActiveFlows {
		flowRegistrationsDropped.WithLabelValues(flowType).Inc()
		return ErrTooManyActiveFlows
	}

	s.activeFlowsCounter++
	return nil
}

func (s *flowLimiter) Release() {
	s.limiterLock.Lock()
	defer s.limiterLock.Unlock()

	if s.activeFlowsCounter <= 0 {
		return
	}

	s.activeFlowsCounter--
}

func (s *flowLimiter) SetLimit(newMaxActiveFlows uint64) {
	s.limiterLock.Lock()
	defer s.limiterLock.Unlock()

	s.maxActiveFlows = newMaxActiveFlows
	s.unlimited = isUnlimited(newMaxActiveFlows)
}

// isUnlimited checks if the value received matches the configuration for the unlimited flow limiter.
func isUnlimited(value uint64) bool {
	return value == unlimitedActiveFlows
}