blob: 7d97baf7597258f69eb36495c2974f1c13d772f9 [file] [log] [blame] [edit]
#include "rate_limiter.h"
#include <algorithm>
#include <memory>
#include <optional>
#include "absl/container/flat_hash_set.h"
#include "absl/log/log.h"
#include "absl/random/random.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "time/clock.h"
#include "central_config.pb.h"
namespace crow {
namespace internal {
LeakyBucketWithRed::LeakyBucketWithRed(
const milotic_tlbmc::RedfishRateLimiterModule& config,
ecclesia::Clock* clock)
: clock_(clock),
leak_rate_(config.leak_rate_per_sec()),
capacity_(config.bucket_capacity()),
level_(0.0),
last_leak_time_(clock_->Now()),
enable_red_(config.red().enabled()),
red_min_th_(config.red().min_threshold()),
red_max_th_(config.red().max_threshold()),
red_max_p_(config.red().max_drop_prob()) {}
void LeakyBucketWithRed::UpdateConfig(
const milotic_tlbmc::RedfishRateLimiterModule& config) {
absl::MutexLock lock(&mu_);
leak_rate_ = config.leak_rate_per_sec();
capacity_ = config.bucket_capacity();
enable_red_ = config.red().enabled();
red_min_th_ = config.red().min_threshold();
red_max_th_ = config.red().max_threshold();
red_max_p_ = config.red().max_drop_prob();
last_leak_time_ = clock_->Now();
level_ = 0.0;
}
std::optional<RateLimiterDropInfo> LeakyBucketWithRed::Admit() {
absl::MutexLock lock(&mu_); // NOLINT
Leak();
if (level_ + 1.0 > capacity_) {
// Bucket is full or will overflow
return RateLimiterDropInfo{.bucket_level = level_,
.bucket_capacity = capacity_,
.leak_rate_per_sec = leak_rate_};
}
if (enable_red_ && level_ > red_min_th_) {
double drop_prob = red_max_p_;
if (level_ < red_max_th_) {
// in [min_th, max_th], drop with linear probability
drop_prob =
red_max_p_ * (level_ - red_min_th_) / (red_max_th_ - red_min_th_);
}
if (absl::Uniform(bitgen_, 0.0, 1.0) < drop_prob) {
// RED drops
return RateLimiterDropInfo{.bucket_level = level_,
.bucket_capacity = capacity_,
.leak_rate_per_sec = leak_rate_};
}
}
// Admit request
level_ += 1.0;
return std::nullopt;
}
void LeakyBucketWithRed::Leak() {
absl::Time now = clock_->Now();
absl::Duration elapsed = now - last_leak_time_;
if (elapsed > absl::ZeroDuration()) {
double leaked = absl::ToDoubleSeconds(elapsed) * leak_rate_;
level_ = std::max(0.0, level_ - leaked);
}
last_leak_time_ = now;
}
} // namespace internal
RateLimiter::RateLimiter(const milotic_tlbmc::RedfishRateLimiterModule& config,
ecclesia::Clock* clock)
: bucket_(std::make_unique<internal::LeakyBucketWithRed>(config, clock)),
active_(config.active()) {
if (config.bypass_uri_size() > 0) {
bypass_uris_.insert(config.bypass_uri().begin(), config.bypass_uri().end());
}
LOG(INFO) << "Rate limiter initialized with config: " << config;
}
void RateLimiter::UpdateConfig(
const milotic_tlbmc::RedfishRateLimiterModule& config) {
{
absl::MutexLock lock(&mutex_);
active_ = config.active();
bypass_uris_.clear();
bypass_uris_.insert(config.bypass_uri().begin(), config.bypass_uri().end());
}
bucket_->UpdateConfig(config);
}
RateLimiter::~RateLimiter() = default;
std::optional<RateLimiterDropInfo> RateLimiter::AdmitRequest(
absl::string_view uri) {
{
absl::ReaderMutexLock lock(&mutex_);
if (!active_ || bypass_uris_.contains(uri)) {
return std::nullopt;
}
}
return bucket_->Admit();
}
} // namespace crow