| #include "tlbmc/scheduler/scheduler.h" |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <memory> |
| #include <optional> |
| #include <thread> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/log.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "boost/asio/executor.hpp" |
| #include "boost/asio/executor_work_guard.hpp" |
| #include "time/clock.h" |
| #include "nlohmann/json.hpp" |
| |
| namespace milotic_tlbmc { |
| |
| nlohmann::json TimeInfo::ToJson() const { |
| nlohmann::json json; |
| json["last_scheduled_time"] = absl::FormatTime(last_scheduled_time); |
| json["last_run_time"] = absl::FormatTime(last_run_time); |
| // Get the last 10 wait times. |
| auto it = wait_times.rbegin(); |
| auto& wait_times_recent_to_oldest = json["wait_times_recent_to_oldest"]; |
| wait_times_recent_to_oldest = nlohmann::json::array(); |
| for (int i = 0; i < 10 && it != wait_times.rend(); ++i, ++it) { |
| wait_times_recent_to_oldest.push_back(absl::FormatDuration(*it)); |
| } |
| return json; |
| } |
| |
| nlohmann::json Task::ToJson() const { |
| nlohmann::json json; |
| json["task_id"] = task_id_; |
| json["period"] = absl::FormatDuration(period_); |
| json["execution_count"] = execution_count_.load(); |
| { |
| absl::MutexLock lock(&time_info_mutex_); |
| json["time_info"] = time_info_.ToJson(); |
| } |
| json["average_wait_time"] = absl::FormatDuration(GetAverageWaitTime()); |
| json["cancelled"] = cancelled_.load(); |
| json["task_mode"] = task_mode_ == TaskMode::kPeriodic ? "periodic" : "once"; |
| return json; |
| } |
| |
| // Overload constructor for tasks that need to be invoked with an ack callback. |
| Task::Task(int task_id, |
| absl::AnyInvocable<void(absl::AnyInvocable<void()>)> task_fn, |
| absl::Duration period, boost::asio::io_context* io, |
| absl::Duration task_timeout, TaskMode task_mode, |
| ecclesia::Clock* clock) |
| : task_id_(task_id), |
| task_mode_(task_mode), |
| clock_(clock), |
| period_(period), |
| cancelled_(false), |
| task_fn_with_ack_(std::move(task_fn)), |
| timer_(std::make_unique<boost::asio::steady_timer>(*io)), |
| task_timeout_(task_timeout) {} |
| |
| absl::Duration Task::GetPeriod() const { return period_.load(); } |
| |
| absl::Duration Task::GetAverageWaitTime() const { |
| absl::MutexLock lock(&time_info_mutex_); |
| if (time_info_.wait_times.empty()) { |
| // No wait times have been recorded yet. This happens when the task is yet |
| // to be scheduled after it's initial run. |
| return absl::InfiniteDuration(); |
| } |
| absl::Duration total_wait_time = absl::ZeroDuration(); |
| for (const auto& wait_time : time_info_.wait_times) { |
| total_wait_time += wait_time; |
| } |
| return total_wait_time / time_info_.wait_times.size(); |
| } |
| |
| absl::Time Task::GetLastRunTime() const { |
| absl::MutexLock lock(&time_info_mutex_); |
| return time_info_.last_run_time; |
| } |
| |
| void Task::ScheduleNextRun() { |
| { |
| absl::MutexLock lock(&time_info_mutex_); |
| if (clock_ != nullptr) { |
| time_info_.last_scheduled_time = clock_->Now(); |
| } |
| } |
| |
| { |
| absl::MutexLock lock(&timer_mutex_); |
| if (cancelled_.load()) { |
| return; |
| } |
| |
| // Wait for 90% of the period to account for userspace scheduling |
| // delays. |
| timer_->expires_after(absl::ToChronoMicroseconds(period_ * 0.9)); |
| timer_->async_wait([weak_self = std::weak_ptr<Task>(shared_from_this())]( |
| boost::system::error_code error) { |
| if (error == boost::asio::error::operation_aborted) { |
| DLOG(ERROR) << "Failed to wait for task to complete with error: " |
| << error.message(); |
| return; |
| } |
| std::shared_ptr<Task> task = weak_self.lock(); |
| if (task == nullptr) { |
| DLOG(WARNING) << "Task is cancelled"; |
| return; |
| } |
| task->Run(); |
| }); |
| } |
| } |
| |
| void Task::OnDone() { |
| ++execution_count_; |
| ScheduleNextRun(); |
| } |
| |
| // Runs the task and updates the next run time. |
| void Task::Run() { |
| if (cancelled_.load()) { |
| DLOG(WARNING) << "Task is cancelled before running!"; |
| return; |
| } |
| |
| { |
| absl::MutexLock lock(&time_info_mutex_); |
| absl::Time now = clock_->Now(); |
| if (time_info_.last_scheduled_time != absl::InfiniteFuture()) { |
| time_info_.wait_times.push_back(now - time_info_.last_scheduled_time); |
| } |
| time_info_.last_run_time = now; |
| } |
| |
| { |
| absl::MutexLock lock(&task_mutex_); |
| if (task_fn_with_ack_ != nullptr) { |
| task_fn_with_ack_( |
| [weak_self = std::weak_ptr<Task>(shared_from_this())]() { |
| std::shared_ptr<Task> task = weak_self.lock(); |
| if (task == nullptr) { |
| DLOG(WARNING) << "Task is destroyed while waiting for ack!"; |
| return; |
| } |
| task->OnDone(); |
| }); |
| return; |
| } |
| } |
| } |
| |
| // Cancels the task. |
| void Task::Cancel() { |
| cancelled_.store(true); |
| absl::MutexLock lock(&timer_mutex_); |
| timer_->cancel(); |
| } |
| |
| void Task::SetPeriod(absl::Duration period) { |
| Cancel(); |
| period_ = period; |
| cancelled_.store(false); |
| ScheduleNextRun(); |
| } |
| |
| TaskScheduler::TaskScheduler(ecclesia::Clock* clock, const Options& options) |
| : next_task_id_(1), |
| clock_(clock), |
| io_(std::make_unique<boost::asio::io_context>()), |
| work_guard_(io_->get_executor()), |
| async_executor_thread_([this]() { io_->run(); }), |
| default_task_timeout_(options.default_task_timeout) { |
| cleanup_task_id_ = RunAndScheduleAsync( |
| [this](absl::AnyInvocable<void()> on_done) { |
| CleanupInactiveTasks(); |
| on_done(); |
| }, |
| options.cleanup_tasks_period); |
| } |
| |
| TaskScheduler::~TaskScheduler() { Stop(); } |
| |
| int TaskScheduler::RunAndScheduleAsync( |
| absl::AnyInvocable<void(absl::AnyInvocable<void()>)> task, |
| absl::Duration period, std::optional<absl::Duration> timeout) { |
| if (task == nullptr) { |
| LOG(ERROR) << "Task is null"; |
| return -1; |
| } |
| |
| if (stop_.load()) { |
| LOG(ERROR) << "TaskScheduler is stopped, cannot schedule async task"; |
| return -1; |
| } |
| |
| int task_id = next_task_id_++; |
| absl::Duration task_timeout = timeout.value_or(default_task_timeout_); |
| auto task_ptr = |
| std::make_shared<Task>(task_id, std::move(task), period, io_.get(), |
| task_timeout, TaskMode::kPeriodic, clock_); |
| task_ptr->Run(); |
| absl::MutexLock lock(&async_tasks_mutex_); |
| id_to_async_tasks_.emplace(task_id, task_ptr); |
| return task_id; |
| } |
| |
| int TaskScheduler::ScheduleOneShotAsync( |
| absl::AnyInvocable<void(absl::AnyInvocable<void()>)> task, |
| absl::Duration period, std::optional<absl::Duration> timeout) { |
| if (task == nullptr) { |
| LOG(ERROR) << "Task is null"; |
| return -1; |
| } |
| |
| if (stop_.load()) { |
| LOG(ERROR) << "TaskScheduler is stopped, cannot run and schedule task"; |
| return -1; |
| } |
| |
| int task_id = next_task_id_++; |
| absl::Duration task_timeout = timeout.value_or(default_task_timeout_); |
| |
| auto oneshot_on_done = [this, task_id]() { Cancel(task_id); }; |
| auto task_ptr = std::make_shared<Task>( |
| task_id, |
| [task = std::move(task), oneshot_on_done = std::move(oneshot_on_done)]( |
| absl::AnyInvocable<void()> on_done) mutable { |
| task(std::move(oneshot_on_done)); |
| }, |
| period, io_.get(), task_timeout, TaskMode::kOnce, clock_); |
| { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| id_to_async_tasks_.emplace(task_id, task_ptr); |
| } |
| task_ptr->ScheduleNextRun(); |
| |
| return task_id; |
| } |
| |
| absl::flat_hash_map<int, std::shared_ptr<Task>> TaskScheduler::GetSnapshot() |
| const { |
| absl::flat_hash_map<int, std::shared_ptr<Task>> snapshot; |
| { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| for (const auto& [task_id, task] : id_to_async_tasks_) { |
| snapshot.emplace(task_id, task); |
| } |
| } |
| return snapshot; |
| } |
| |
| size_t TaskScheduler::GetAllUserTaskCount() const { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| // We always have one internal task scheduled for cleanup. |
| return id_to_async_tasks_.size() - 1; |
| } |
| |
| size_t TaskScheduler::GetAllTaskCount() const { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| return id_to_async_tasks_.size(); |
| } |
| |
| void TaskScheduler::UpdateTaskPeriod(int task_id, absl::Duration period) { |
| if (stop_.load()) { |
| LOG(WARNING) << "TaskScheduler is stopped, cannot update task period: " |
| << task_id; |
| return; |
| } |
| |
| absl::MutexLock lock(&async_tasks_mutex_); |
| auto it = id_to_async_tasks_.find(task_id); |
| if (it != id_to_async_tasks_.end()) { |
| it->second->SetPeriod(period); |
| } |
| } |
| |
| void TaskScheduler::Cancel(int task_id) { |
| if (stop_.load()) { |
| LOG(WARNING) << "TaskScheduler is stopped, cannot cancel task: " << task_id; |
| return; |
| } |
| |
| // Cancel the async task. |
| { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| auto it = id_to_async_tasks_.find(task_id); |
| if (it != id_to_async_tasks_.end()) { |
| it->second->Cancel(); |
| // Remove the task from the tasks map. |
| id_to_async_tasks_.erase(it); |
| } |
| } |
| } |
| |
| void TaskScheduler::CancelAll() { |
| if (stop_.load()) { |
| LOG(ERROR) << "TaskScheduler is stopped, cannot cancel all tasks"; |
| return; |
| } |
| |
| { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| for (const auto& [task_id, task] : id_to_async_tasks_) { |
| task->Cancel(); |
| } |
| id_to_async_tasks_.clear(); |
| } |
| } |
| |
| void TaskScheduler::CleanupInactiveTasks() { |
| if (stop_.load()) { |
| return; |
| } |
| |
| // Copy id_to_async_tasks_ to a vector to avoid holding the lock while |
| // cancelling tasks. |
| std::vector<std::pair<int, std::shared_ptr<Task>>> id_to_async_tasks_copy; |
| // Task ids of Tasks to cancel. |
| std::vector<int> tasks_to_cancel; |
| { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| id_to_async_tasks_copy = std::vector<std::pair<int, std::shared_ptr<Task>>>( |
| id_to_async_tasks_.begin(), id_to_async_tasks_.end()); |
| } |
| for (const auto& [task_id, task] : id_to_async_tasks_copy) { |
| absl::Time last_run_time = task->GetLastRunTime(); |
| if (last_run_time == absl::InfiniteFuture()) { |
| continue; |
| } |
| absl::Duration time_since_last_run = clock_->Now() - last_run_time; |
| if (time_since_last_run > task->GetTaskTimeout()) { |
| tasks_to_cancel.push_back(task_id); |
| } |
| } |
| for (int task_id : tasks_to_cancel) { |
| LOG(ERROR) << "Cleanup inactive task: " << task_id |
| << " as it has not run for the last " << default_task_timeout_; |
| Cancel(task_id); |
| } |
| } |
| |
| void TaskScheduler::Stop() { |
| if (stop_.load()) { |
| // Already stopped. |
| return; |
| } |
| |
| CancelAll(); |
| stop_.store(true); |
| |
| if (async_executor_thread_.joinable()) { |
| io_->stop(); |
| async_executor_thread_.join(); |
| } |
| } |
| |
| size_t TaskScheduler::GetSchedulingAccuracyPercentage() const { |
| // Copy id_to_async_tasks_ to avoid holding the lock while calculating |
| // accuracy. |
| absl::flat_hash_map<int, std::shared_ptr<Task>> id_to_async_tasks_copy; |
| { |
| absl::MutexLock lock(&async_tasks_mutex_); |
| id_to_async_tasks_copy = id_to_async_tasks_; |
| } |
| |
| if (id_to_async_tasks_copy.empty()) { |
| // If there are no tasks, we consider the scheduler to be accurate. |
| return 100; |
| } |
| |
| double total_inaccuracy = 0.0; |
| |
| for (const auto& [task_id, task] : id_to_async_tasks_copy) { |
| absl::Duration average_wait_time = task->GetAverageWaitTime(); |
| double task_inaccuracy; |
| if (average_wait_time == absl::InfiniteDuration()) { |
| // No wait times have been recorded yet. This happens when the task is yet |
| // to be scheduled after it's initial run. |
| // We don't want to penalize the scheduler for this case. So we consider |
| // the inaccuracy to be 0. |
| continue; |
| } |
| absl::Duration period = task->GetPeriod(); |
| if (average_wait_time <= period) { |
| task_inaccuracy = 0; |
| } else { |
| // Inaccuracy based on how much *later* than the period it waited |
| absl::Duration lateness = average_wait_time - period; |
| DLOG(INFO) << "Task " << task_id << " lateness: " << lateness; |
| // Define how inaccuracy scales with lateness. |
| task_inaccuracy = (absl::ToDoubleNanoseconds(lateness) / |
| absl::ToDoubleNanoseconds(period)) * |
| 100.0; |
| // Ensure inaccuracy is not negative (shouldn't happen here but for |
| // safety) |
| task_inaccuracy = std::max(0.0, task_inaccuracy); |
| } |
| total_inaccuracy += task_inaccuracy; |
| DLOG(INFO) << "Task " << task_id << "has period: " << period |
| << " average wait time: " << average_wait_time |
| << " with inaccuracy: " << task_inaccuracy |
| << "% (based on being late)"; |
| } |
| |
| size_t overall_inaccuracy_percentage = 0; |
| overall_inaccuracy_percentage = static_cast<size_t>( |
| total_inaccuracy / static_cast<double>(id_to_async_tasks_copy.size())); |
| size_t overall_accuracy_percentage = 100 - overall_inaccuracy_percentage; |
| DLOG(INFO) << "Overall accuracy percentage: " << overall_accuracy_percentage; |
| return overall_accuracy_percentage; |
| } |
| |
| nlohmann::json TaskScheduler::ToJson() const { |
| nlohmann::json json; |
| json["scheduling_accuracy_percentage"] = GetSchedulingAccuracyPercentage(); |
| absl::MutexLock lock(&async_tasks_mutex_); |
| for (const auto& [task_id, task] : id_to_async_tasks_) { |
| if (task_id == cleanup_task_id_) { |
| json["scheduler_tasks"].push_back(task->ToJson()); |
| continue; |
| } |
| json["user_tasks"].push_back(task->ToJson()); |
| } |
| json["user_async_task_count"] = id_to_async_tasks_.size() - 1; |
| json["total_async_task_count"] = id_to_async_tasks_.size(); |
| return json; |
| } |
| |
| } // namespace milotic_tlbmc |