| #include "rate_limiter.h" |
| |
| #include <algorithm> |
| #include <memory> |
| #include <optional> |
| |
| #include "absl/container/flat_hash_set.h" |
| #include "absl/log/log.h" |
| #include "absl/random/random.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "time/clock.h" |
| #include "central_config.pb.h" |
| |
| namespace crow { |
| namespace internal { |
| |
| LeakyBucketWithRed::LeakyBucketWithRed( |
| const milotic_tlbmc::RedfishRateLimiterModule& config, |
| ecclesia::Clock* clock) |
| : clock_(clock), |
| leak_rate_(config.leak_rate_per_sec()), |
| capacity_(config.bucket_capacity()), |
| level_(0.0), |
| last_leak_time_(clock_->Now()), |
| enable_red_(config.red().enabled()), |
| red_min_th_(config.red().min_threshold()), |
| red_max_th_(config.red().max_threshold()), |
| red_max_p_(config.red().max_drop_prob()) {} |
| |
| void LeakyBucketWithRed::UpdateConfig( |
| const milotic_tlbmc::RedfishRateLimiterModule& config) { |
| absl::MutexLock lock(&mu_); |
| leak_rate_ = config.leak_rate_per_sec(); |
| capacity_ = config.bucket_capacity(); |
| enable_red_ = config.red().enabled(); |
| red_min_th_ = config.red().min_threshold(); |
| red_max_th_ = config.red().max_threshold(); |
| red_max_p_ = config.red().max_drop_prob(); |
| last_leak_time_ = clock_->Now(); |
| level_ = 0.0; |
| } |
| |
| std::optional<RateLimiterDropInfo> LeakyBucketWithRed::Admit() { |
| absl::MutexLock lock(&mu_); // NOLINT |
| Leak(); |
| |
| if (level_ + 1.0 > capacity_) { |
| // Bucket is full or will overflow |
| return RateLimiterDropInfo{.bucket_level = level_, |
| .bucket_capacity = capacity_, |
| .leak_rate_per_sec = leak_rate_}; |
| } |
| |
| if (enable_red_ && level_ > red_min_th_) { |
| double drop_prob = red_max_p_; |
| if (level_ < red_max_th_) { |
| // in [min_th, max_th], drop with linear probability |
| drop_prob = |
| red_max_p_ * (level_ - red_min_th_) / (red_max_th_ - red_min_th_); |
| } |
| if (absl::Uniform(bitgen_, 0.0, 1.0) < drop_prob) { |
| // RED drops |
| return RateLimiterDropInfo{.bucket_level = level_, |
| .bucket_capacity = capacity_, |
| .leak_rate_per_sec = leak_rate_}; |
| } |
| } |
| |
| // Admit request |
| level_ += 1.0; |
| return std::nullopt; |
| } |
| |
| void LeakyBucketWithRed::Leak() { |
| absl::Time now = clock_->Now(); |
| absl::Duration elapsed = now - last_leak_time_; |
| if (elapsed > absl::ZeroDuration()) { |
| double leaked = absl::ToDoubleSeconds(elapsed) * leak_rate_; |
| level_ = std::max(0.0, level_ - leaked); |
| } |
| last_leak_time_ = now; |
| } |
| |
| } // namespace internal |
| |
| RateLimiter::RateLimiter(const milotic_tlbmc::RedfishRateLimiterModule& config, |
| ecclesia::Clock* clock) |
| : bucket_(std::make_unique<internal::LeakyBucketWithRed>(config, clock)), |
| active_(config.active()) { |
| if (config.bypass_uri_size() > 0) { |
| bypass_uris_.insert(config.bypass_uri().begin(), config.bypass_uri().end()); |
| } |
| LOG(INFO) << "Rate limiter initialized with config: " << config; |
| } |
| |
| void RateLimiter::UpdateConfig( |
| const milotic_tlbmc::RedfishRateLimiterModule& config) { |
| { |
| absl::MutexLock lock(&mutex_); |
| active_ = config.active(); |
| bypass_uris_.clear(); |
| bypass_uris_.insert(config.bypass_uri().begin(), config.bypass_uri().end()); |
| } |
| bucket_->UpdateConfig(config); |
| } |
| |
| RateLimiter::~RateLimiter() = default; |
| |
| std::optional<RateLimiterDropInfo> RateLimiter::AdmitRequest( |
| absl::string_view uri) { |
| { |
| absl::ReaderMutexLock lock(&mutex_); |
| if (!active_ || bypass_uris_.contains(uri)) { |
| return std::nullopt; |
| } |
| } |
| return bucket_->Admit(); |
| } |
| |
| } // namespace crow |