blob: a3076af4ce2f4dac86ca746bfa8a05f1a61cbe59 [file] [log] [blame]
#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/container/btree_set.h"
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "tlbmc/adapter/data_source.h"
#include "tlbmc/subscription/manager.h"
#include "identifier.pb.h"
#include "payload.pb.h"
#include "sensor_identifier.pb.h"
#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
#include "tlbmc/scheduler/scheduler.h"
namespace milotic_hft {
struct IdentifierHash {
size_t operator()(const milotic_hft::Identifier& identifier) const {
return absl::HashOf(absl::StrCat(identifier));
}
};
struct IdentifierEqual {
bool operator()(const milotic_hft::Identifier& a,
const milotic_hft::Identifier& b) const {
return absl::StrCat(a) == absl::StrCat(b);
}
};
// Forward declaration of ActiveSubscription.
class ActiveSubscription;
// Manages the sampling interval of a resource.
// It tracks the sampling interval of a resource and the subscribers
// of the resource. When a new subscriber is added, the effective sampling
// interval of the resource is updated to be the minimum of the sampling
// intervals of all subscribers.
// When a subscriber is removed, the effective sampling interval is updated to
// be the minimum of the sampling intervals of the remaining subscribers.
class ResourceMonitor {
public:
using SubscriptionId = SubscriptionManager::SubscriptionId;
explicit ResourceMonitor(const milotic_hft::Identifier& identifier,
DataSource* data_source)
: identifier_(identifier),
current_sampling_interval_ms_(0),
data_source_(data_source) {}
const milotic_hft::Identifier& GetIdentifier() const { return identifier_; }
absl::Status AddSubscriber(
const std::shared_ptr<ActiveSubscription>& subscription)
ABSL_LOCKS_EXCLUDED(subscribers_mutex_);
absl::Status RemoveSubscriber(const SubscriptionId& id)
ABSL_LOCKS_EXCLUDED(subscribers_mutex_);
absl::Status ConfigureDataSource() ABSL_LOCKS_EXCLUDED(subscribers_mutex_);
int GetCurrentSamplingIntervalMs() const {
return current_sampling_interval_ms_;
}
protected:
const milotic_hft::Identifier identifier_;
std::atomic<int> current_sampling_interval_ms_;
std::atomic<int> current_max_batch_size_;
absl::Mutex subscribers_mutex_;
absl::flat_hash_map<SubscriptionManager::SubscriptionId,
std::shared_ptr<ActiveSubscription>>
subscribers_ ABSL_GUARDED_BY(subscribers_mutex_);
absl::btree_multiset<int> lowest_sampling_interval_subscriptions_
ABSL_GUARDED_BY(subscribers_mutex_);
DataSource* data_source_;
absl::btree_multiset<int, std::greater<>> max_batch_sizes_
ABSL_GUARDED_BY(subscribers_mutex_);
};
class ActiveSubscription
: public std::enable_shared_from_this<ActiveSubscription> {
public:
using SubscriptionId = SubscriptionManager::SubscriptionId;
ActiveSubscription(
const SubscriptionId& id, const milotic_hft::SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback,
absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
on_subscription_ended,
milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source);
~ActiveSubscription() {
task_scheduler_->Cancel(task_id_);
LOG(WARNING) << "ActiveSubscription " << id_ << " destroyed.";
}
void Begin();
SubscriptionId GetId() const { return id_; }
const milotic_hft::SubscriptionParams& GetParams() const { return params_; }
void DeliverData(Payload&& data);
bool IsActive() const { return batches_remaining_ != 0; }
void OnSubscriptionEnd(const SubscriptionId& id, const absl::Status& status) {
on_subscription_ended_callback_(id, status);
}
private:
std::atomic<int> task_id_ = -1; // -1 means no task is scheduled.
const SubscriptionId id_;
const milotic_hft::SubscriptionParams params_;
absl::AnyInvocable<void(Payload&&)> on_data_callback_;
absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
on_subscription_ended_callback_;
std::atomic<int> batches_remaining_ = -1; // -1 means unlimited.
absl::flat_hash_map<milotic_hft::Identifier, absl::Time, IdentifierHash,
IdentifierEqual>
identifier_to_last_sampled_time_
ABSL_GUARDED_BY(identifier_to_last_sampled_time_ms_mutex_);
absl::Mutex identifier_to_last_sampled_time_ms_mutex_;
milotic_tlbmc::TaskScheduler* task_scheduler_;
DataSource* data_source_;
};
class SubscriptionManagerImpl : public SubscriptionManager {
public:
using SubscriptionId = SubscriptionManager::SubscriptionId;
// Creates a subscription manager.
static std::unique_ptr<SubscriptionManagerImpl> Create(
std::unique_ptr<DataSource> data_source) {
return absl::WrapUnique(
new SubscriptionManagerImpl(std::move(data_source)));
}
absl::StatusOr<SubscriptionId> AddSubscription(
const SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback) override
ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_);
absl::Status Unsubscribe(const SubscriptionId& sub_id) override
ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_);
~SubscriptionManagerImpl() override { task_scheduler_->Stop(); }
private:
explicit SubscriptionManagerImpl(std::unique_ptr<DataSource> data_source)
: next_subscription_id_(0),
task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()),
data_source_(std::move(data_source)) {}
SubscriptionId GenerateSubscriptionId();
void OnSubscriptionEnd(const SubscriptionId& subscription_id,
const absl::Status& status)
ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_,
resource_monitors_mutex_);
std::atomic<size_t> next_subscription_id_;
std::unique_ptr<milotic_tlbmc::TaskScheduler> task_scheduler_;
absl::Mutex active_subscriptions_mutex_;
absl::flat_hash_map<SubscriptionId, std::shared_ptr<ActiveSubscription>>
active_subscriptions_ ABSL_GUARDED_BY(active_subscriptions_mutex_);
std::unique_ptr<DataSource> data_source_;
absl::Mutex resource_monitors_mutex_;
absl::flat_hash_map<milotic_hft::Identifier, std::shared_ptr<ResourceMonitor>,
IdentifierHash, IdentifierEqual>
resource_monitors_ ABSL_GUARDED_BY(resource_monitors_mutex_);
SubscriptionManager* manager_;
};
} // namespace milotic_hft
#endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_