| |
| #include "bmc/scheduler_bmc.h" |
| |
| #include <chrono> // NOLINT(build/c++11) |
| #include <iostream> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <string_view> |
| #include <utility> |
| |
| #include "absl/base/attributes.h" |
| #include "absl/base/thread_annotations.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| |
| // NOLINTBEGIN(readability/boost): This file is only used for test in google3 |
| #include "boost/asio/io_context.hpp" |
| #include "boost/asio/io_service.hpp" |
| #include "boost/bind/bind.hpp" |
| #include "boost/date_time/posix_time/posix_time.hpp" |
| #include "boost/system/detail/errc.hpp" |
| #include "boost/system/detail/error_code.hpp" |
| // NOLINTEND(readability/boost) |
| |
| namespace safepower_agent { |
| |
| SchedulerBMC::SchedulerBMC( |
| boost::asio::io_context& io_in ABSL_ATTRIBUTE_LIFETIME_BOUND) |
| : io_(io_in), |
| work_guard_(boost::asio::executor_work_guard< |
| boost::asio::io_context::executor_type>(io_.get_executor())), |
| shutdown_called_(false) {} |
| |
| SchedulerBMC::~SchedulerBMC() { |
| if (!shutdown_called_) { |
| (void)Shutdown(); // the error has already been logged |
| } |
| while (!timer_by_name_.empty()) { |
| io_.poll(); |
| } |
| } |
| |
| absl::Status SchedulerBMC::Shutdown() { |
| // No new task in the event loop |
| absl::Status errc = CancelAll(); |
| if (!errc.ok()) { |
| LOG(WARNING) << "~Scheduler, unable to cancel all current timers" |
| << "error message:" << errc; |
| } |
| absl::MutexLock lock(&lock_); |
| |
| // allow the event loop function to return (io_->run()) |
| work_guard_.reset(); |
| shutdown_called_ = true; |
| return errc; |
| } |
| |
| void SchedulerBMC::Print() { |
| LOG(INFO) << "printing all names and timers"; |
| for (const auto& [name, timer] : timer_by_name_) { |
| LOG(INFO) << " name: " << name << " Addr:" << timer; |
| } |
| } |
| |
| // only SchedulerBMC->Run Task will run the following code |
| void SchedulerBMC::CallbackPeriodic( |
| std::string task_name, std::unique_ptr<boost::asio::steady_timer> timer, |
| std::chrono::milliseconds wait_duration, |
| std::shared_ptr<absl::AnyInvocable<void()>> fn) { |
| timer->expires_at(timer->expiry() + wait_duration); |
| timer->async_wait([captured_this = this, captured_name = std::move(task_name), |
| captured_timer = std::move(timer), wait_duration, |
| lambda_fn = fn](const boost::system::error_code& ec) |
| ABSL_LOCKS_EXCLUDED(lock_) mutable { |
| if (ec == boost::asio::error::operation_aborted) { |
| // this code path happens when the task is canceled |
| LOG(WARNING) << "CallBack lambda operation_aborted " |
| << captured_name; |
| absl::MutexLock lock(&captured_this->lock_); |
| captured_this->timer_by_name_.erase(captured_name); |
| return; |
| } |
| if (ec != boost::system::errc::errc_t::success) { |
| LOG(ERROR) |
| << "boost timer error value: " << ec.value() |
| << "name:" << ec.category().name(); |
| absl::MutexLock lock(&captured_this->lock_); |
| captured_this->timer_by_name_.erase(captured_name); |
| return; |
| } |
| captured_this->CallbackPeriodic( |
| captured_name, std::move(captured_timer), |
| wait_duration, std::move(lambda_fn)); |
| }); |
| (*fn)(); // running the task after async_wait allows the self cancel |
| } |
| |
| // API callers run this function |
| absl::Status SchedulerBMC::CancelAll() { |
| absl::MutexLock lock(&lock_); |
| for (auto it : timer_by_name_) { |
| it.second->cancel(); |
| std::cerr << "canceling timer named :" << it.first << std::endl; |
| } |
| return absl::OkStatus(); |
| } |
| |
| // API callers run this function |
| absl::Status SchedulerBMC::CancelCall(const std::string_view name) { |
| absl::MutexLock lock(&lock_); |
| // TODO when map is switched to absl flat_hash_map |
| // do not convert to string_view to string (heterogeneous-lookup) |
| std::string name_str(name); |
| auto it = timer_by_name_.find(name_str); |
| if (it == timer_by_name_.end()) { |
| LOG(ERROR) << "Task name not found " << name_str << " tasks_by_name_: "; |
| Print(); |
| return absl::NotFoundError("Task name not found"); |
| } |
| it->second->cancel(); |
| return absl::OkStatus(); |
| } |
| |
| // API callers run this function, the lambda function is run in the event loop |
| absl::Status SchedulerBMC::PeriodicCall(absl::AnyInvocable<void()> fn, |
| const absl::Duration interval, |
| const std::string_view name) { |
| if (shutdown_called_) { |
| return absl::FailedPreconditionError("Scheduler is shutting down"); |
| } |
| std::string name_str(name); |
| // create timer using io |
| auto timer_ptr = std::make_unique<boost::asio::steady_timer>(io_); |
| { |
| absl::MutexLock lock(&lock_); |
| |
| auto it = timer_by_name_.find(name_str); |
| if (it != timer_by_name_.end()) { |
| return absl::AlreadyExistsError("Task already exists"); |
| } |
| // add task to tasks_by_name_ |
| timer_by_name_[name_str] = timer_ptr.get(); |
| } |
| auto shared_fn = std::make_shared<absl::AnyInvocable<void()>>(std::move(fn)); |
| auto interval_chrono = absl::ToChronoMilliseconds(interval); |
| timer_ptr->expires_after(interval_chrono); |
| timer_ptr->async_wait([captured_name = std::move(name_str), |
| captured_timer = std::move(timer_ptr), interval_chrono, |
| lambda_fn = shared_fn, captured_this = this]( |
| const boost::system::error_code& |
| ec) ABSL_LOCKS_EXCLUDED(lock_) mutable { |
| if (ec != boost::system::errc::errc_t::success) { |
| LOG(WARNING) |
| << "PeriodicCall lambda operation_aborted or error before first call " |
| << ec.value() << "name:" << ec.category().name(); |
| absl::MutexLock lock(&captured_this->lock_); |
| captured_this->timer_by_name_.erase(captured_name); |
| return; |
| } |
| captured_this->CallbackPeriodic(captured_name, std::move(captured_timer), |
| interval_chrono, std::move(lambda_fn)); |
| }); |
| return absl::OkStatus(); |
| } |
| |
| // API callers run this function, the lambda function is run in the event loop |
| absl::Status SchedulerBMC::DelayCall(absl::AnyInvocable<void() &&> fn, |
| const absl::Duration delay, |
| const std::string_view name) { |
| if (shutdown_called_) { |
| return absl::FailedPreconditionError("Scheduler is shutting down"); |
| } |
| std::string name_str(name); |
| // create timer using io |
| auto timer_ptr = std::make_unique<boost::asio::steady_timer>(io_); |
| { |
| absl::MutexLock lock(&lock_); |
| |
| auto it = timer_by_name_.find(name_str); |
| // allow the running task to reschedule itself |
| if (it != timer_by_name_.end() && !(current_task_ == name_str)) { |
| return absl::AlreadyExistsError("Task already exists"); |
| } |
| // add task to tasks_by_name_ |
| timer_by_name_[name_str] = timer_ptr.get(); |
| } |
| timer_ptr->expires_after(absl::ToChronoMilliseconds(delay)); |
| timer_ptr->async_wait([captured_name = std::move(name_str), |
| lambda_fn = std::move(fn), captured_this = this]( |
| const boost::system::error_code& |
| ec) ABSL_LOCKS_EXCLUDED(lock_) mutable { |
| if (ec != boost::system::errc::errc_t::success) { |
| LOG(WARNING) |
| << "DelayCall lambda operation_aborted or error before first call " |
| << ec.value() << "name:" << ec.category().name(); |
| } |
| captured_this->current_task_ = captured_name; |
| std::move(lambda_fn)(); |
| captured_this->current_task_ = std::nullopt; |
| absl::MutexLock lock(&captured_this->lock_); |
| captured_this->timer_by_name_.erase(captured_name); |
| }); |
| |
| return absl::OkStatus(); |
| } |
| } // namespace safepower_agent |