blob: ff2ec767fe46e8cc9b4a8c7fb5b580e8d7767fac [file] [log] [blame]
#include "tlbmc/scheduler/scheduler.h"
#include <algorithm>
#include <cstddef>
#include <memory>
#include <optional>
#include <thread>
#include <utility>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "boost/asio/executor.hpp"
#include "boost/asio/executor_work_guard.hpp"
#include "time/clock.h"
#include "nlohmann/json.hpp"
namespace milotic_tlbmc {
nlohmann::json TimeInfo::ToJson() const {
nlohmann::json json;
json["last_scheduled_time"] = absl::FormatTime(last_scheduled_time);
json["last_run_time"] = absl::FormatTime(last_run_time);
// Get the last 10 wait times.
auto it = wait_times.rbegin();
auto& wait_times_recent_to_oldest = json["wait_times_recent_to_oldest"];
wait_times_recent_to_oldest = nlohmann::json::array();
for (int i = 0; i < 10 && it != wait_times.rend(); ++i, ++it) {
wait_times_recent_to_oldest.push_back(absl::FormatDuration(*it));
}
return json;
}
nlohmann::json Task::ToJson() const {
nlohmann::json json;
json["task_id"] = task_id_;
json["period"] = absl::FormatDuration(period_);
json["execution_count"] = execution_count_.load();
{
absl::MutexLock lock(&time_info_mutex_);
json["time_info"] = time_info_.ToJson();
}
json["average_wait_time"] = absl::FormatDuration(GetAverageWaitTime());
json["cancelled"] = cancelled_.load();
json["task_mode"] = task_mode_ == TaskMode::kPeriodic ? "periodic" : "once";
return json;
}
// Overload constructor for tasks that need to be invoked with an ack callback.
Task::Task(int task_id,
absl::AnyInvocable<void(absl::AnyInvocable<void()>)> task_fn,
absl::Duration period, boost::asio::io_context* io,
absl::Duration task_timeout, TaskMode task_mode,
ecclesia::Clock* clock)
: task_id_(task_id),
task_mode_(task_mode),
clock_(clock),
period_(period),
cancelled_(false),
task_fn_with_ack_(std::move(task_fn)),
timer_(std::make_unique<boost::asio::steady_timer>(*io)),
task_timeout_(task_timeout) {}
absl::Duration Task::GetPeriod() const { return period_.load(); }
absl::Duration Task::GetAverageWaitTime() const {
absl::MutexLock lock(&time_info_mutex_);
if (time_info_.wait_times.empty()) {
// No wait times have been recorded yet. This happens when the task is yet
// to be scheduled after it's initial run.
return absl::InfiniteDuration();
}
absl::Duration total_wait_time = absl::ZeroDuration();
for (const auto& wait_time : time_info_.wait_times) {
total_wait_time += wait_time;
}
return total_wait_time / time_info_.wait_times.size();
}
absl::Time Task::GetLastRunTime() const {
absl::MutexLock lock(&time_info_mutex_);
return time_info_.last_run_time;
}
void Task::ScheduleNextRun() {
{
absl::MutexLock lock(&time_info_mutex_);
if (clock_ != nullptr) {
time_info_.last_scheduled_time = clock_->Now();
}
}
{
absl::MutexLock lock(&timer_mutex_);
if (cancelled_.load()) {
return;
}
// Wait for 90% of the period to account for userspace scheduling
// delays.
timer_->expires_after(absl::ToChronoMicroseconds(period_ * 0.9));
timer_->async_wait([weak_self = std::weak_ptr<Task>(shared_from_this())](
boost::system::error_code error) {
if (error == boost::asio::error::operation_aborted) {
DLOG(ERROR) << "Failed to wait for task to complete with error: "
<< error.message();
return;
}
std::shared_ptr<Task> task = weak_self.lock();
if (task == nullptr) {
DLOG(WARNING) << "Task is cancelled";
return;
}
task->Run();
});
}
}
void Task::OnDone() {
++execution_count_;
ScheduleNextRun();
}
// Runs the task and updates the next run time.
void Task::Run() {
if (cancelled_.load()) {
DLOG(WARNING) << "Task is cancelled before running!";
return;
}
{
absl::MutexLock lock(&time_info_mutex_);
absl::Time now = clock_->Now();
if (time_info_.last_scheduled_time != absl::InfiniteFuture()) {
time_info_.wait_times.push_back(now - time_info_.last_scheduled_time);
}
time_info_.last_run_time = now;
}
{
absl::MutexLock lock(&task_mutex_);
if (task_fn_with_ack_ != nullptr) {
task_fn_with_ack_(
[weak_self = std::weak_ptr<Task>(shared_from_this())]() {
std::shared_ptr<Task> task = weak_self.lock();
if (task == nullptr) {
DLOG(WARNING) << "Task is destroyed while waiting for ack!";
return;
}
task->OnDone();
});
return;
}
}
}
// Cancels the task.
void Task::Cancel() {
cancelled_.store(true);
absl::MutexLock lock(&timer_mutex_);
timer_->cancel();
}
void Task::SetPeriod(absl::Duration period) {
Cancel();
period_ = period;
cancelled_.store(false);
ScheduleNextRun();
}
TaskScheduler::TaskScheduler(ecclesia::Clock* clock, const Options& options)
: next_task_id_(1),
clock_(clock),
io_(std::make_unique<boost::asio::io_context>()),
work_guard_(io_->get_executor()),
async_executor_thread_([this]() { io_->run(); }),
default_task_timeout_(options.default_task_timeout) {
cleanup_task_id_ = RunAndScheduleAsync(
[this](absl::AnyInvocable<void()> on_done) {
CleanupInactiveTasks();
on_done();
},
options.cleanup_tasks_period);
}
TaskScheduler::~TaskScheduler() { Stop(); }
int TaskScheduler::RunAndScheduleAsync(
absl::AnyInvocable<void(absl::AnyInvocable<void()>)> task,
absl::Duration period, std::optional<absl::Duration> timeout) {
if (task == nullptr) {
LOG(ERROR) << "Task is null";
return -1;
}
if (stop_.load()) {
LOG(ERROR) << "TaskScheduler is stopped, cannot schedule async task";
return -1;
}
int task_id = next_task_id_++;
absl::Duration task_timeout = timeout.value_or(default_task_timeout_);
auto task_ptr =
std::make_shared<Task>(task_id, std::move(task), period, io_.get(),
task_timeout, TaskMode::kPeriodic, clock_);
task_ptr->Run();
absl::MutexLock lock(&async_tasks_mutex_);
id_to_async_tasks_.emplace(task_id, task_ptr);
return task_id;
}
int TaskScheduler::ScheduleOneShotAsync(
absl::AnyInvocable<void(absl::AnyInvocable<void()>)> task,
absl::Duration period, std::optional<absl::Duration> timeout) {
if (task == nullptr) {
LOG(ERROR) << "Task is null";
return -1;
}
if (stop_.load()) {
LOG(ERROR) << "TaskScheduler is stopped, cannot run and schedule task";
return -1;
}
int task_id = next_task_id_++;
absl::Duration task_timeout = timeout.value_or(default_task_timeout_);
auto oneshot_on_done = [this, task_id]() { Cancel(task_id); };
auto task_ptr = std::make_shared<Task>(
task_id,
[task = std::move(task), oneshot_on_done = std::move(oneshot_on_done)](
absl::AnyInvocable<void()> on_done) mutable {
task(std::move(oneshot_on_done));
},
period, io_.get(), task_timeout, TaskMode::kOnce, clock_);
{
absl::MutexLock lock(&async_tasks_mutex_);
id_to_async_tasks_.emplace(task_id, task_ptr);
}
task_ptr->ScheduleNextRun();
return task_id;
}
absl::flat_hash_map<int, std::shared_ptr<Task>> TaskScheduler::GetSnapshot()
const {
absl::flat_hash_map<int, std::shared_ptr<Task>> snapshot;
{
absl::MutexLock lock(&async_tasks_mutex_);
for (const auto& [task_id, task] : id_to_async_tasks_) {
snapshot.emplace(task_id, task);
}
}
return snapshot;
}
size_t TaskScheduler::GetAllUserTaskCount() const {
absl::MutexLock lock(&async_tasks_mutex_);
// We always have one internal task scheduled for cleanup.
return id_to_async_tasks_.size() - 1;
}
size_t TaskScheduler::GetAllTaskCount() const {
absl::MutexLock lock(&async_tasks_mutex_);
return id_to_async_tasks_.size();
}
void TaskScheduler::UpdateTaskPeriod(int task_id, absl::Duration period) {
if (stop_.load()) {
LOG(WARNING) << "TaskScheduler is stopped, cannot update task period: "
<< task_id;
return;
}
absl::MutexLock lock(&async_tasks_mutex_);
auto it = id_to_async_tasks_.find(task_id);
if (it != id_to_async_tasks_.end()) {
it->second->SetPeriod(period);
}
}
void TaskScheduler::Cancel(int task_id) {
if (stop_.load()) {
LOG(WARNING) << "TaskScheduler is stopped, cannot cancel task: " << task_id;
return;
}
// Cancel the async task.
{
absl::MutexLock lock(&async_tasks_mutex_);
auto it = id_to_async_tasks_.find(task_id);
if (it != id_to_async_tasks_.end()) {
it->second->Cancel();
// Remove the task from the tasks map.
id_to_async_tasks_.erase(it);
}
}
}
void TaskScheduler::CancelAll() {
if (stop_.load()) {
LOG(ERROR) << "TaskScheduler is stopped, cannot cancel all tasks";
return;
}
{
absl::MutexLock lock(&async_tasks_mutex_);
for (const auto& [task_id, task] : id_to_async_tasks_) {
task->Cancel();
}
id_to_async_tasks_.clear();
}
}
void TaskScheduler::CleanupInactiveTasks() {
if (stop_.load()) {
return;
}
// Copy id_to_async_tasks_ to a vector to avoid holding the lock while
// cancelling tasks.
std::vector<std::pair<int, std::shared_ptr<Task>>> id_to_async_tasks_copy;
// Task ids of Tasks to cancel.
std::vector<int> tasks_to_cancel;
{
absl::MutexLock lock(&async_tasks_mutex_);
id_to_async_tasks_copy = std::vector<std::pair<int, std::shared_ptr<Task>>>(
id_to_async_tasks_.begin(), id_to_async_tasks_.end());
}
for (const auto& [task_id, task] : id_to_async_tasks_copy) {
absl::Time last_run_time = task->GetLastRunTime();
if (last_run_time == absl::InfiniteFuture()) {
continue;
}
absl::Duration time_since_last_run = clock_->Now() - last_run_time;
if (time_since_last_run > task->GetTaskTimeout()) {
tasks_to_cancel.push_back(task_id);
}
}
for (int task_id : tasks_to_cancel) {
LOG(ERROR) << "Cleanup inactive task: " << task_id
<< " as it has not run for the last " << default_task_timeout_;
Cancel(task_id);
}
}
void TaskScheduler::Stop() {
if (stop_.load()) {
// Already stopped.
return;
}
CancelAll();
stop_.store(true);
if (async_executor_thread_.joinable()) {
io_->stop();
async_executor_thread_.join();
}
}
size_t TaskScheduler::GetSchedulingAccuracyPercentage() const {
// Copy id_to_async_tasks_ to avoid holding the lock while calculating
// accuracy.
absl::flat_hash_map<int, std::shared_ptr<Task>> id_to_async_tasks_copy;
{
absl::MutexLock lock(&async_tasks_mutex_);
id_to_async_tasks_copy = id_to_async_tasks_;
}
if (id_to_async_tasks_copy.empty()) {
// If there are no tasks, we consider the scheduler to be accurate.
return 100;
}
double total_inaccuracy = 0.0;
for (const auto& [task_id, task] : id_to_async_tasks_copy) {
absl::Duration average_wait_time = task->GetAverageWaitTime();
double task_inaccuracy;
if (average_wait_time == absl::InfiniteDuration()) {
// No wait times have been recorded yet. This happens when the task is yet
// to be scheduled after it's initial run.
// We don't want to penalize the scheduler for this case. So we consider
// the inaccuracy to be 0.
continue;
}
absl::Duration period = task->GetPeriod();
if (average_wait_time <= period) {
task_inaccuracy = 0;
} else {
// Inaccuracy based on how much *later* than the period it waited
absl::Duration lateness = average_wait_time - period;
DLOG(INFO) << "Task " << task_id << " lateness: " << lateness;
// Define how inaccuracy scales with lateness.
task_inaccuracy = (absl::ToDoubleNanoseconds(lateness) /
absl::ToDoubleNanoseconds(period)) *
100.0;
// Ensure inaccuracy is not negative (shouldn't happen here but for
// safety)
task_inaccuracy = std::max(0.0, task_inaccuracy);
}
total_inaccuracy += task_inaccuracy;
DLOG(INFO) << "Task " << task_id << "has period: " << period
<< " average wait time: " << average_wait_time
<< " with inaccuracy: " << task_inaccuracy
<< "% (based on being late)";
}
size_t overall_inaccuracy_percentage = 0;
overall_inaccuracy_percentage = static_cast<size_t>(
total_inaccuracy / static_cast<double>(id_to_async_tasks_copy.size()));
size_t overall_accuracy_percentage = 100 - overall_inaccuracy_percentage;
DLOG(INFO) << "Overall accuracy percentage: " << overall_accuracy_percentage;
return overall_accuracy_percentage;
}
nlohmann::json TaskScheduler::ToJson() const {
nlohmann::json json;
json["scheduling_accuracy_percentage"] = GetSchedulingAccuracyPercentage();
absl::MutexLock lock(&async_tasks_mutex_);
for (const auto& [task_id, task] : id_to_async_tasks_) {
if (task_id == cleanup_task_id_) {
json["scheduler_tasks"].push_back(task->ToJson());
continue;
}
json["user_tasks"].push_back(task->ToJson());
}
json["user_async_task_count"] = id_to_async_tasks_.size() - 1;
json["total_async_task_count"] = id_to_async_tasks_.size();
return json;
}
} // namespace milotic_tlbmc