| #ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_ |
| #define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_ |
| |
| #include <atomic> |
| #include <cstddef> |
| #include <cstdint> |
| #include <functional> |
| #include <memory> |
| #include <utility> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/container/btree_set.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/hash/hash.h" |
| #include "absl/log/log.h" |
| #include "absl/memory/memory.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "tlbmc/adapter/data_source.h" |
| #include "tlbmc/subscription/manager.h" |
| #include "identifier.pb.h" |
| #include "payload.pb.h" |
| #include "sensor_identifier.pb.h" |
| #include "sensor_payload.pb.h" |
| #include "subscription_params.pb.h" |
| #include "tlbmc/scheduler/scheduler.h" |
| |
| namespace milotic_hft { |
| |
| struct IdentifierHash { |
| size_t operator()(const milotic_hft::Identifier& identifier) const { |
| return absl::HashOf(absl::StrCat(identifier)); |
| } |
| }; |
| |
| struct IdentifierEqual { |
| bool operator()(const milotic_hft::Identifier& a, |
| const milotic_hft::Identifier& b) const { |
| return absl::StrCat(a) == absl::StrCat(b); |
| } |
| }; |
| |
| // Forward declaration of ActiveSubscription. |
| class ActiveSubscription; |
| |
| // Manages the sampling interval of a resource. |
| // It tracks the sampling interval of a resource and the subscribers |
| // of the resource. When a new subscriber is added, the effective sampling |
| // interval of the resource is updated to be the minimum of the sampling |
| // intervals of all subscribers. |
| // When a subscriber is removed, the effective sampling interval is updated to |
| // be the minimum of the sampling intervals of the remaining subscribers. |
| class ResourceMonitor { |
| public: |
| using SubscriptionId = SubscriptionManager::SubscriptionId; |
| |
| explicit ResourceMonitor(const milotic_hft::Identifier& identifier, |
| DataSource* data_source) |
| : identifier_(identifier), |
| current_sampling_interval_ms_(0), |
| data_source_(data_source) {} |
| |
| const milotic_hft::Identifier& GetIdentifier() const { return identifier_; } |
| |
| absl::Status AddSubscriber( |
| const std::shared_ptr<ActiveSubscription>& subscription) |
| ABSL_LOCKS_EXCLUDED(subscribers_mutex_); |
| |
| absl::Status RemoveSubscriber(const SubscriptionId& id) |
| ABSL_LOCKS_EXCLUDED(subscribers_mutex_); |
| |
| absl::Status ConfigureDataSource() ABSL_LOCKS_EXCLUDED(subscribers_mutex_); |
| |
| int GetCurrentSamplingIntervalMs() const { |
| return current_sampling_interval_ms_; |
| } |
| |
| protected: |
| const milotic_hft::Identifier identifier_; |
| std::atomic<int> current_sampling_interval_ms_; |
| std::atomic<int> current_max_batch_size_; |
| absl::Mutex subscribers_mutex_; |
| absl::flat_hash_map<SubscriptionManager::SubscriptionId, |
| std::shared_ptr<ActiveSubscription>> |
| subscribers_ ABSL_GUARDED_BY(subscribers_mutex_); |
| absl::btree_multiset<int> lowest_sampling_interval_subscriptions_ |
| ABSL_GUARDED_BY(subscribers_mutex_); |
| DataSource* data_source_; |
| absl::btree_multiset<int, std::greater<>> max_batch_sizes_ |
| ABSL_GUARDED_BY(subscribers_mutex_); |
| }; |
| |
| class ActiveSubscription |
| : public std::enable_shared_from_this<ActiveSubscription> { |
| public: |
| using SubscriptionId = SubscriptionManager::SubscriptionId; |
| |
| ActiveSubscription( |
| const SubscriptionId& id, const milotic_hft::SubscriptionParams& params, |
| absl::AnyInvocable<void(Payload&&)> on_data_callback, |
| absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)> |
| on_subscription_ended, |
| milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source); |
| |
| ~ActiveSubscription() { |
| task_scheduler_->Cancel(task_id_); |
| LOG(WARNING) << "ActiveSubscription " << id_ << " destroyed."; |
| } |
| |
| void Begin(); |
| |
| SubscriptionId GetId() const { return id_; } |
| |
| const milotic_hft::SubscriptionParams& GetParams() const { return params_; } |
| |
| void DeliverData(Payload&& data); |
| |
| bool IsActive() const { return batches_remaining_ != 0; } |
| |
| void OnSubscriptionEnd(const SubscriptionId& id, const absl::Status& status) { |
| on_subscription_ended_callback_(id, status); |
| } |
| |
| private: |
| std::atomic<int> task_id_ = -1; // -1 means no task is scheduled. |
| const SubscriptionId id_; |
| const milotic_hft::SubscriptionParams params_; |
| absl::AnyInvocable<void(Payload&&)> on_data_callback_; |
| absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)> |
| on_subscription_ended_callback_; |
| std::atomic<int> batches_remaining_ = -1; // -1 means unlimited. |
| absl::flat_hash_map<milotic_hft::Identifier, absl::Time, IdentifierHash, |
| IdentifierEqual> |
| identifier_to_last_sampled_time_ |
| ABSL_GUARDED_BY(identifier_to_last_sampled_time_ms_mutex_); |
| absl::Mutex identifier_to_last_sampled_time_ms_mutex_; |
| milotic_tlbmc::TaskScheduler* task_scheduler_; |
| DataSource* data_source_; |
| }; |
| |
| class SubscriptionManagerImpl : public SubscriptionManager { |
| public: |
| using SubscriptionId = SubscriptionManager::SubscriptionId; |
| |
| // Creates a subscription manager. |
| static std::unique_ptr<SubscriptionManagerImpl> Create( |
| std::unique_ptr<DataSource> data_source) { |
| return absl::WrapUnique( |
| new SubscriptionManagerImpl(std::move(data_source))); |
| } |
| |
| absl::StatusOr<SubscriptionId> AddSubscription( |
| const SubscriptionParams& params, |
| absl::AnyInvocable<void(Payload&&)> on_data_callback) override |
| ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_); |
| |
| absl::Status Unsubscribe(const SubscriptionId& sub_id) override |
| ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_); |
| |
| ~SubscriptionManagerImpl() override { task_scheduler_->Stop(); } |
| |
| private: |
| explicit SubscriptionManagerImpl(std::unique_ptr<DataSource> data_source) |
| : next_subscription_id_(0), |
| task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()), |
| data_source_(std::move(data_source)) {} |
| |
| SubscriptionId GenerateSubscriptionId(); |
| void OnSubscriptionEnd(const SubscriptionId& subscription_id, |
| const absl::Status& status) |
| ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_, |
| resource_monitors_mutex_); |
| |
| std::atomic<size_t> next_subscription_id_; |
| std::unique_ptr<milotic_tlbmc::TaskScheduler> task_scheduler_; |
| absl::Mutex active_subscriptions_mutex_; |
| absl::flat_hash_map<SubscriptionId, std::shared_ptr<ActiveSubscription>> |
| active_subscriptions_ ABSL_GUARDED_BY(active_subscriptions_mutex_); |
| std::unique_ptr<DataSource> data_source_; |
| |
| absl::Mutex resource_monitors_mutex_; |
| absl::flat_hash_map<milotic_hft::Identifier, std::shared_ptr<ResourceMonitor>, |
| IdentifierHash, IdentifierEqual> |
| resource_monitors_ ABSL_GUARDED_BY(resource_monitors_mutex_); |
| SubscriptionManager* manager_; |
| }; |
| |
| } // namespace milotic_hft |
| |
| #endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_ |