blob: d1869ef8b8ecca3aad11877c5040f7c19cc6bffa [file] [log] [blame]
#pragma once
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <set>
#include <vector>
constexpr unsigned backgroundBps = 4 * 1024;
template <class ClockType_>
class Scheduler;
template <class ClockType_ = std::chrono::steady_clock>
class MetricBase : public std::enable_shared_from_this<MetricBase<ClockType_>>
{
// type definition
public:
using ClockType = ClockType_;
// constructor
/**
* interval: refresh rate for the Metric
*/
MetricBase(std::shared_ptr<Scheduler<ClockType_>> scheduler,
ClockType_::duration interval) :
scheduler(scheduler), interval(interval)
{
// partialRefreshCBs.emplace_back(std::bind(scheduler->callback,
// nullptr));
}
virtual ~MetricBase()
{
for (auto&& func : partialRefreshCBs)
{
if (func)
{
func(std::make_error_code(std::errc::operation_canceled));
}
}
for (auto&& func : refreshCBs)
{
if (func)
{
func(std::make_error_code(std::errc::operation_canceled));
}
}
for (auto&& func : completeCBs)
{
if (func)
{
func(std::make_error_code(std::errc::operation_canceled));
}
}
}
// firendship
private:
friend class Scheduler<ClockType>;
// variables
protected:
std::shared_ptr<Scheduler<ClockType>> scheduler;
const ClockType::duration interval;
std::optional<
std::pair<std::chrono::time_point<ClockType>, std::vector<uint8_t>>>
inprocessData; // start time, data
private:
bool refreshing = false;
std::vector<std::function<void(std::error_code ec)>> refreshCBs;
std::vector<std::function<void(std::error_code ec)>> partialRefreshCBs;
std::vector<std::function<void(std::error_code ec)>> completeCBs;
/* methods */
public:
// refresh the whole cache
void refresh(std::function<void(std::error_code ec)>&& cb);
// refresh partial of the cache if the data is large.
void partialRefresh(std::function<void(std::error_code ec)>&& cb);
/**
* register a callback which will be triggered for every refresh completion.
* the callback will NOT be removed after a single refresh completion
*/
void registerCompleteCB(std::function<void(std::error_code ec)>&& cb)
{
completeCBs.push_back(std::move(cb));
}
bool isRefreshing() const;
// bool isPartialRefreshing() const = 0;
protected:
// read data from device and update the
// implemetation should extend this virtual function and do the real data
// fetch from device.
virtual void readDevice(
std::function<void(std::error_code ec, size_t size, bool complete)>&&
cb) noexcept = 0;
public:
// interface
virtual constexpr std::string_view getIdentifier() const noexcept = 0;
virtual bool isCacheValid() const noexcept = 0;
virtual std::tuple<std::chrono::time_point<ClockType>,
std::chrono::time_point<ClockType>,
std::span<const uint8_t>>
getCache() const noexcept = 0;
};
template <class ClockType_ = std::chrono::steady_clock>
class Scheduler : public std::enable_shared_from_this<Scheduler<ClockType_>>
{
public:
using ClockType = ClockType_;
using ScheduledTask = std::pair<std::chrono::time_point<ClockType>,
std::weak_ptr<MetricBase<ClockType>>>;
// template <class ClockType>
explicit Scheduler(boost::asio::io_context& io);
~Scheduler() = default;
// find the schuduler with give path for a storage componenet (e.g.
// Controller)
static std::shared_ptr<Scheduler<ClockType_>>
getScheduler(const std::string& path);
void enqueue(std::chrono::time_point<ClockType> scheduledTime,
std::shared_ptr<MetricBase<ClockType>> task);
void dequeue();
// void callback(std::shared_ptr<MetricBase<ClockType>> task, size_t size);
void start()
{
started = true;
}
private:
friend class MetricBase<ClockType_>;
boost::asio::io_context& io;
boost::asio::steady_timer timer;
bool started = false;
// Task Queue, sorted by Scheduled Time.
std::multiset<ScheduledTask,
bool (*)(const ScheduledTask&, const ScheduledTask&)>
taskQueue;
std::vector<std::shared_ptr<bool>> validFlags;
std::weak_ptr<MetricBase<ClockType>>
executingTask; // the metric is currently under execution.
};
template <class ClockType_>
void MetricBase<ClockType_>::refresh(
std::function<void(std::error_code ec)>&& cb)
{
refreshing = true;
refreshCBs.push_back(std::move(cb));
scheduler->enqueue(ClockType_::now(), this->shared_from_this());
}
template <class ClockType_>
void MetricBase<ClockType_>::partialRefresh(
std::function<void(std::error_code ec)>&& cb)
{
partialRefreshCBs.push_back(std::move(cb));
scheduler->enqueue(ClockType_::now(), this->shared_from_this());
}
template <class ClockType_>
bool MetricBase<ClockType_>::isRefreshing() const
{
return refreshing;
}
//
// void ::callback(
// std::shared_ptr<MetricBase<ClockType>> task, size_t size)
// {}
template <class ClockType>
bool compareScheduleTask(const typename Scheduler<ClockType>::ScheduledTask& a,
const typename Scheduler<ClockType>::ScheduledTask& b)
{
return a.first < b.first;
}
template <class ClockType_>
Scheduler<ClockType_>::Scheduler(boost::asio::io_context& io) :
io(io), timer(io), taskQueue(&compareScheduleTask<ClockType_>)
{}
template <class ClockType_>
void Scheduler<ClockType_>::enqueue(
std::chrono::time_point<ClockType> scheduledTime,
std::shared_ptr<MetricBase<ClockType>> task)
{
// check if the task is being executed.
if (!executingTask.owner_before(task) && !task.owner_before(executingTask))
{
return;
}
timer.cancel();
validFlags.clear();
assert(task && "insert invalid task into schduler");
// remove duplicate and find the earliest schdule time
std::chrono::time_point<ClockType> time = scheduledTime;
for (auto itr = taskQueue.begin(); itr != taskQueue.end();)
{
auto metric = itr->second.lock();
if (!metric)
{
// LOG_INFO
std::cerr << "the task is invalid, removed from task queue\n";
itr = taskQueue.erase(itr);
}
else if (!metric.owner_before(task) &&
!task.owner_before(metric)) // find duplicate task
{
time = std::min(time, itr->first);
itr = taskQueue.erase(itr);
}
else
{
itr++;
}
}
taskQueue.emplace(time, task);
if (started)
{
dequeue();
}
}
template <class ClockType_>
void Scheduler<ClockType_>::dequeue()
{
if (taskQueue.empty())
{
return;
}
const std::pair<std::chrono::time_point<ClockType_>,
std::weak_ptr<MetricBase<ClockType_>>>& head =
*taskQueue.begin();
validFlags.emplace_back(std::make_shared<bool>(true));
std::weak_ptr<bool> weakFlag = validFlags.back();
timer.expires_at(head.first);
timer.async_wait([weakSelf{this->weak_from_this()},
weakFlag](boost::system::error_code ec) {
if (ec || weakFlag.expired())
{
// LOG_INFO
std::cerr << "new expiration time set\n";
return;
}
auto self = weakSelf.lock();
if (!self)
{
// LOG_INFO
std::cerr << "the scheduler has stopped\n";
return;
}
auto node = self->taskQueue.extract(self->taskQueue.begin());
std::weak_ptr<MetricBase<ClockType_>> weakTask = node.value().second;
auto task = weakTask.lock();
if (!task)
{
// LOG_INFO
std::cerr << "the task is invalid, removed from task queue\n";
self->dequeue();
return;
}
self->executingTask = task;
task->readDevice(
[self{self}, weakTask{weakTask}](std::error_code /* ec */,
size_t size, bool complete) {
// todo: deal with ec
std::shared_ptr<MetricBase<ClockType_>> task = weakTask.lock();
if (!task)
{
std::cerr << "the task is invalid, removed from task queue\n";
self->executingTask.reset();
self->dequeue();
return;
}
// calcuate the next schedule time
std::chrono::time_point<ClockType_> time;
if (complete)
{
if (task->interval == ClockType_::duration::max())
{
time = ClockType_::time_point::max();
}
else
{
time = ClockType_::now() + task->interval;
}
}
else if (task->refreshing)
{
time = ClockType_::now();
}
else
{
time = ClockType_::now() +
std::chrono::milliseconds(static_cast<size_t>(
static_cast<double>(size) * 1000 / backgroundBps));
}
self->executingTask.reset();
self->enqueue(time, task);
for (auto&& func : task->partialRefreshCBs)
{
if (func)
{
func({});
}
}
task->partialRefreshCBs.clear();
if (complete)
{
for (auto&& func : task->refreshCBs)
{
if (func)
{
func({});
}
}
for (auto&& func : task->completeCBs)
{
if (func)
{
func({});
}
}
task->refreshCBs.clear();
task->refreshing = false;
}
});
});
}