blob: 32077a515d68dc804258d4dc67096d7f37eb1c21 [file] [log] [blame]
#include "bmc/scheduler_bmc.h"
#include <chrono> // NOLINT(build/c++11)
#include <iostream>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <string_view>
#include <utility>
#include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
// NOLINTBEGIN(readability/boost): This file is only used for test in google3
#include "boost/asio/io_context.hpp"
#include "boost/asio/io_service.hpp"
#include "boost/bind/bind.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/system/detail/errc.hpp"
#include "boost/system/detail/error_code.hpp"
// NOLINTEND(readability/boost)
namespace safepower_agent {
SchedulerBMC::SchedulerBMC(
boost::asio::io_context& io_in ABSL_ATTRIBUTE_LIFETIME_BOUND)
: io_(io_in),
work_guard_(boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>(io_.get_executor())),
shutdown_called_(false) {}
SchedulerBMC::~SchedulerBMC() {
if (!shutdown_called_) {
(void)Shutdown(); // the error has already been logged
}
while (!timer_by_name_.empty()) {
io_.poll();
}
}
absl::Status SchedulerBMC::Shutdown() {
// No new task in the event loop
absl::Status errc = CancelAll();
if (!errc.ok()) {
LOG(WARNING) << "~Scheduler, unable to cancel all current timers"
<< "error message:" << errc;
}
absl::MutexLock lock(&lock_);
// allow the event loop function to return (io_->run())
work_guard_.reset();
shutdown_called_ = true;
return errc;
}
void SchedulerBMC::Print() {
LOG(INFO) << "printing all names and timers";
for (const auto& [name, timer] : timer_by_name_) {
LOG(INFO) << " name: " << name << " Addr:" << timer;
}
}
// only SchedulerBMC->Run Task will run the following code
void SchedulerBMC::CallbackPeriodic(
std::string task_name, std::unique_ptr<boost::asio::steady_timer> timer,
std::chrono::milliseconds wait_duration,
std::shared_ptr<absl::AnyInvocable<void()>> fn) {
timer->expires_at(timer->expiry() + wait_duration);
timer->async_wait([captured_this = this, captured_name = std::move(task_name),
captured_timer = std::move(timer), wait_duration,
lambda_fn = fn](const boost::system::error_code& ec)
ABSL_LOCKS_EXCLUDED(lock_) mutable {
if (ec == boost::asio::error::operation_aborted) {
// this code path happens when the task is canceled
LOG(WARNING) << "CallBack lambda operation_aborted "
<< captured_name;
absl::MutexLock lock(&captured_this->lock_);
captured_this->timer_by_name_.erase(captured_name);
return;
}
if (ec != boost::system::errc::errc_t::success) {
LOG(ERROR)
<< "boost timer error value: " << ec.value()
<< "name:" << ec.category().name();
absl::MutexLock lock(&captured_this->lock_);
captured_this->timer_by_name_.erase(captured_name);
return;
}
captured_this->CallbackPeriodic(
captured_name, std::move(captured_timer),
wait_duration, std::move(lambda_fn));
});
(*fn)(); // running the task after async_wait allows the self cancel
}
// API callers run this function
absl::Status SchedulerBMC::CancelAll() {
absl::MutexLock lock(&lock_);
for (auto it : timer_by_name_) {
it.second->cancel();
std::cerr << "canceling timer named :" << it.first << std::endl;
}
return absl::OkStatus();
}
// API callers run this function
absl::Status SchedulerBMC::CancelCall(const std::string_view name) {
absl::MutexLock lock(&lock_);
// TODO when map is switched to absl flat_hash_map
// do not convert to string_view to string (heterogeneous-lookup)
std::string name_str(name);
auto it = timer_by_name_.find(name_str);
if (it == timer_by_name_.end()) {
LOG(ERROR) << "Task name not found " << name_str << " tasks_by_name_: ";
Print();
return absl::NotFoundError("Task name not found");
}
it->second->cancel();
return absl::OkStatus();
}
// API callers run this function, the lambda function is run in the event loop
absl::Status SchedulerBMC::PeriodicCall(absl::AnyInvocable<void()> fn,
const absl::Duration interval,
const std::string_view name) {
if (shutdown_called_) {
return absl::FailedPreconditionError("Scheduler is shutting down");
}
std::string name_str(name);
// create timer using io
auto timer_ptr = std::make_unique<boost::asio::steady_timer>(io_);
{
absl::MutexLock lock(&lock_);
auto it = timer_by_name_.find(name_str);
if (it != timer_by_name_.end()) {
return absl::AlreadyExistsError("Task already exists");
}
// add task to tasks_by_name_
timer_by_name_[name_str] = timer_ptr.get();
}
auto shared_fn = std::make_shared<absl::AnyInvocable<void()>>(std::move(fn));
auto interval_chrono = absl::ToChronoMilliseconds(interval);
timer_ptr->expires_after(interval_chrono);
timer_ptr->async_wait([captured_name = std::move(name_str),
captured_timer = std::move(timer_ptr), interval_chrono,
lambda_fn = shared_fn, captured_this = this](
const boost::system::error_code&
ec) ABSL_LOCKS_EXCLUDED(lock_) mutable {
if (ec != boost::system::errc::errc_t::success) {
LOG(WARNING)
<< "PeriodicCall lambda operation_aborted or error before first call "
<< ec.value() << "name:" << ec.category().name();
absl::MutexLock lock(&captured_this->lock_);
captured_this->timer_by_name_.erase(captured_name);
return;
}
captured_this->CallbackPeriodic(captured_name, std::move(captured_timer),
interval_chrono, std::move(lambda_fn));
});
return absl::OkStatus();
}
// API callers run this function, the lambda function is run in the event loop
absl::Status SchedulerBMC::DelayCall(absl::AnyInvocable<void() &&> fn,
const absl::Duration delay,
const std::string_view name) {
if (shutdown_called_) {
return absl::FailedPreconditionError("Scheduler is shutting down");
}
std::string name_str(name);
// create timer using io
auto timer_ptr = std::make_unique<boost::asio::steady_timer>(io_);
{
absl::MutexLock lock(&lock_);
auto it = timer_by_name_.find(name_str);
// allow the running task to reschedule itself
if (it != timer_by_name_.end() && !(current_task_ == name_str)) {
return absl::AlreadyExistsError("Task already exists");
}
// add task to tasks_by_name_
timer_by_name_[name_str] = timer_ptr.get();
}
timer_ptr->expires_after(absl::ToChronoMilliseconds(delay));
timer_ptr->async_wait([captured_name = std::move(name_str),
lambda_fn = std::move(fn), captured_this = this](
const boost::system::error_code&
ec) ABSL_LOCKS_EXCLUDED(lock_) mutable {
if (ec != boost::system::errc::errc_t::success) {
LOG(WARNING)
<< "DelayCall lambda operation_aborted or error before first call "
<< ec.value() << "name:" << ec.category().name();
}
captured_this->current_task_ = captured_name;
std::move(lambda_fn)();
captured_this->current_task_ = std::nullopt;
absl::MutexLock lock(&captured_this->lock_);
captured_this->timer_by_name_.erase(captured_name);
});
return absl::OkStatus();
}
} // namespace safepower_agent