| #pragma once |
| #include <boost/asio/steady_timer.hpp> |
| |
| #include <chrono> |
| #include <functional> |
| #include <iostream> |
| #include <memory> |
| #include <set> |
| #include <vector> |
| |
| constexpr unsigned backgroundBps = 4 * 1024; |
| template <class ClockType_> |
| class Scheduler; |
| |
| template <class ClockType_ = std::chrono::steady_clock> |
| class MetricBase : public std::enable_shared_from_this<MetricBase<ClockType_>> |
| { |
| // type definition |
| public: |
| using ClockType = ClockType_; |
| |
| // constructor |
| /** |
| * interval: refresh rate for the Metric |
| */ |
| MetricBase(std::shared_ptr<Scheduler<ClockType_>> scheduler, |
| ClockType_::duration interval) : |
| scheduler(scheduler), interval(interval) |
| { |
| // partialRefreshCBs.emplace_back(std::bind(scheduler->callback, |
| // nullptr)); |
| } |
| |
| virtual ~MetricBase() |
| { |
| for (auto&& func : partialRefreshCBs) |
| { |
| if (func) |
| { |
| func(std::make_error_code(std::errc::operation_canceled)); |
| } |
| } |
| for (auto&& func : refreshCBs) |
| { |
| if (func) |
| { |
| func(std::make_error_code(std::errc::operation_canceled)); |
| } |
| } |
| for (auto&& func : completeCBs) |
| { |
| if (func) |
| { |
| func(std::make_error_code(std::errc::operation_canceled)); |
| } |
| } |
| } |
| |
| // firendship |
| private: |
| friend class Scheduler<ClockType>; |
| // variables |
| protected: |
| std::shared_ptr<Scheduler<ClockType>> scheduler; |
| const ClockType::duration interval; |
| |
| std::optional< |
| std::pair<std::chrono::time_point<ClockType>, std::vector<uint8_t>>> |
| inprocessData; // start time, data |
| private: |
| bool refreshing = false; |
| std::vector<std::function<void(std::error_code ec)>> refreshCBs; |
| std::vector<std::function<void(std::error_code ec)>> partialRefreshCBs; |
| std::vector<std::function<void(std::error_code ec)>> completeCBs; |
| |
| /* methods */ |
| public: |
| // refresh the whole cache |
| void refresh(std::function<void(std::error_code ec)>&& cb); |
| |
| // refresh partial of the cache if the data is large. |
| void partialRefresh(std::function<void(std::error_code ec)>&& cb); |
| |
| /** |
| * register a callback which will be triggered for every refresh completion. |
| * the callback will NOT be removed after a single refresh completion |
| */ |
| void registerCompleteCB(std::function<void(std::error_code ec)>&& cb) |
| { |
| completeCBs.push_back(std::move(cb)); |
| } |
| |
| bool isRefreshing() const; |
| // bool isPartialRefreshing() const = 0; |
| |
| protected: |
| // read data from device and update the |
| // implemetation should extend this virtual function and do the real data |
| // fetch from device. |
| virtual void readDevice( |
| std::function<void(std::error_code ec, size_t size, bool complete)>&& |
| cb) noexcept = 0; |
| |
| public: |
| // interface |
| virtual constexpr std::string_view getIdentifier() const noexcept = 0; |
| virtual bool isCacheValid() const noexcept = 0; |
| virtual std::tuple<std::chrono::time_point<ClockType>, |
| std::chrono::time_point<ClockType>, |
| std::span<const uint8_t>> |
| getCache() const noexcept = 0; |
| }; |
| |
| template <class ClockType_ = std::chrono::steady_clock> |
| class Scheduler : public std::enable_shared_from_this<Scheduler<ClockType_>> |
| { |
| public: |
| using ClockType = ClockType_; |
| |
| using ScheduledTask = std::pair<std::chrono::time_point<ClockType>, |
| std::weak_ptr<MetricBase<ClockType>>>; |
| |
| // template <class ClockType> |
| explicit Scheduler(boost::asio::io_context& io); |
| ~Scheduler() = default; |
| |
| // find the schuduler with give path for a storage componenet (e.g. |
| // Controller) |
| static std::shared_ptr<Scheduler<ClockType_>> |
| getScheduler(const std::string& path); |
| |
| void enqueue(std::chrono::time_point<ClockType> scheduledTime, |
| std::shared_ptr<MetricBase<ClockType>> task); |
| void dequeue(); |
| // void callback(std::shared_ptr<MetricBase<ClockType>> task, size_t size); |
| void start() |
| { |
| started = true; |
| } |
| |
| private: |
| friend class MetricBase<ClockType_>; |
| |
| boost::asio::io_context& io; |
| boost::asio::steady_timer timer; |
| bool started = false; |
| // Task Queue, sorted by Scheduled Time. |
| std::multiset<ScheduledTask, |
| bool (*)(const ScheduledTask&, const ScheduledTask&)> |
| taskQueue; |
| std::vector<std::shared_ptr<bool>> validFlags; |
| |
| std::weak_ptr<MetricBase<ClockType>> |
| executingTask; // the metric is currently under execution. |
| }; |
| |
| template <class ClockType_> |
| void MetricBase<ClockType_>::refresh( |
| std::function<void(std::error_code ec)>&& cb) |
| { |
| refreshing = true; |
| |
| refreshCBs.push_back(std::move(cb)); |
| scheduler->enqueue(ClockType_::now(), this->shared_from_this()); |
| } |
| |
| template <class ClockType_> |
| void MetricBase<ClockType_>::partialRefresh( |
| std::function<void(std::error_code ec)>&& cb) |
| { |
| partialRefreshCBs.push_back(std::move(cb)); |
| scheduler->enqueue(ClockType_::now(), this->shared_from_this()); |
| } |
| template <class ClockType_> |
| bool MetricBase<ClockType_>::isRefreshing() const |
| { |
| return refreshing; |
| } |
| |
| // |
| // void ::callback( |
| // std::shared_ptr<MetricBase<ClockType>> task, size_t size) |
| // {} |
| |
| template <class ClockType> |
| bool compareScheduleTask(const typename Scheduler<ClockType>::ScheduledTask& a, |
| const typename Scheduler<ClockType>::ScheduledTask& b) |
| { |
| return a.first < b.first; |
| } |
| |
| template <class ClockType_> |
| Scheduler<ClockType_>::Scheduler(boost::asio::io_context& io) : |
| io(io), timer(io), taskQueue(&compareScheduleTask<ClockType_>) |
| {} |
| |
| template <class ClockType_> |
| void Scheduler<ClockType_>::enqueue( |
| std::chrono::time_point<ClockType> scheduledTime, |
| std::shared_ptr<MetricBase<ClockType>> task) |
| { |
| // check if the task is being executed. |
| if (!executingTask.owner_before(task) && !task.owner_before(executingTask)) |
| { |
| return; |
| } |
| timer.cancel(); |
| validFlags.clear(); |
| |
| assert(task && "insert invalid task into schduler"); |
| // remove duplicate and find the earliest schdule time |
| std::chrono::time_point<ClockType> time = scheduledTime; |
| for (auto itr = taskQueue.begin(); itr != taskQueue.end();) |
| { |
| auto metric = itr->second.lock(); |
| if (!metric) |
| { |
| // LOG_INFO |
| std::cerr << "the task is invalid, removed from task queue\n"; |
| itr = taskQueue.erase(itr); |
| } |
| else if (!metric.owner_before(task) && |
| !task.owner_before(metric)) // find duplicate task |
| { |
| time = std::min(time, itr->first); |
| itr = taskQueue.erase(itr); |
| } |
| else |
| { |
| itr++; |
| } |
| } |
| |
| taskQueue.emplace(time, task); |
| |
| if (started) |
| { |
| dequeue(); |
| } |
| } |
| template <class ClockType_> |
| void Scheduler<ClockType_>::dequeue() |
| { |
| if (taskQueue.empty()) |
| { |
| return; |
| } |
| const std::pair<std::chrono::time_point<ClockType_>, |
| std::weak_ptr<MetricBase<ClockType_>>>& head = |
| *taskQueue.begin(); |
| validFlags.emplace_back(std::make_shared<bool>(true)); |
| std::weak_ptr<bool> weakFlag = validFlags.back(); |
| |
| timer.expires_at(head.first); |
| timer.async_wait([weakSelf{this->weak_from_this()}, |
| weakFlag](boost::system::error_code ec) { |
| if (ec || weakFlag.expired()) |
| { |
| // LOG_INFO |
| std::cerr << "new expiration time set\n"; |
| return; |
| } |
| |
| auto self = weakSelf.lock(); |
| |
| if (!self) |
| { |
| // LOG_INFO |
| std::cerr << "the scheduler has stopped\n"; |
| return; |
| } |
| |
| auto node = self->taskQueue.extract(self->taskQueue.begin()); |
| std::weak_ptr<MetricBase<ClockType_>> weakTask = node.value().second; |
| auto task = weakTask.lock(); |
| if (!task) |
| { |
| // LOG_INFO |
| std::cerr << "the task is invalid, removed from task queue\n"; |
| self->dequeue(); |
| return; |
| } |
| self->executingTask = task; |
| |
| task->readDevice( |
| [self{self}, weakTask{weakTask}](std::error_code /* ec */, |
| size_t size, bool complete) { |
| // todo: deal with ec |
| |
| std::shared_ptr<MetricBase<ClockType_>> task = weakTask.lock(); |
| if (!task) |
| { |
| std::cerr << "the task is invalid, removed from task queue\n"; |
| self->executingTask.reset(); |
| self->dequeue(); |
| return; |
| } |
| |
| // calcuate the next schedule time |
| std::chrono::time_point<ClockType_> time; |
| if (complete) |
| { |
| if (task->interval == ClockType_::duration::max()) |
| { |
| time = ClockType_::time_point::max(); |
| } |
| else |
| { |
| time = ClockType_::now() + task->interval; |
| } |
| } |
| else if (task->refreshing) |
| { |
| time = ClockType_::now(); |
| } |
| else |
| { |
| time = ClockType_::now() + |
| std::chrono::milliseconds(static_cast<size_t>( |
| static_cast<double>(size) * 1000 / backgroundBps)); |
| } |
| |
| self->executingTask.reset(); |
| self->enqueue(time, task); |
| |
| for (auto&& func : task->partialRefreshCBs) |
| { |
| if (func) |
| { |
| func({}); |
| } |
| } |
| task->partialRefreshCBs.clear(); |
| |
| if (complete) |
| { |
| for (auto&& func : task->refreshCBs) |
| { |
| if (func) |
| { |
| func({}); |
| } |
| } |
| for (auto&& func : task->completeCBs) |
| { |
| if (func) |
| { |
| func({}); |
| } |
| } |
| task->refreshCBs.clear(); |
| task->refreshing = false; |
| } |
| }); |
| }); |
| } |