blob: 1005a3d263333471874ca7d6d3d5e3a79a776951 [file] [log] [blame]
#include "tlbmc/subscription/manager_impl.h"
#include <sys/param.h>
#include <unistd.h>
#include <atomic>
#include <cmath>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "tlbmc/feed_payload.pb.h"
#include "tlbmc/collection_entity.pb.h"
#include "tlbmc/feed_client_interface.h"
#include "tlbmc/hft_subscription.pb.h"
#include "tlbmc/client_subscription.pb.h"
#include "tlbmc/service_messages.pb.h"
#include "one/offline_node_entities.pb.h"
#include "one/resolved_entities.pb.h"
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "g3/macros.h"
#include "nlohmann/json_fwd.hpp"
#include "tlbmc/adapter/data_source.h"
#include "tlbmc/subscription/manager.h"
#include "identifier.pb.h"
#include "payload.pb.h"
#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
#include "resource.pb.h"
#include "tlbmc/scheduler/scheduler.h"
namespace milotic_hft {
namespace {
using ::platforms_syshealth::collection::feed::FeedClientInterface;
using ::platforms_syshealth::collection::feed::GetPoliciesRequest;
using ::platforms_syshealth::collection::feed::GetPoliciesResponse;
absl::StatusOr<absl::Time> GetLastSampledTime(const Payload& data) {
if (data.has_high_frequency_sensors_readings_batch()) {
const auto& sensors =
data.high_frequency_sensors_readings_batch().high_frequency_sensors();
if (sensors.empty()) {
return absl::NotFoundError("No sensors found in the batch.");
}
const auto& readings = sensors.rbegin()->timestamped_readings();
if (readings.empty()) {
return absl::NotFoundError("No readings found.");
}
// Get the last sampled time from the last sensor.
return absl::FromUnixNanos(readings.rbegin()->timestamp_ns());
}
return absl::InvalidArgumentError("Unsupported payload type.");
}
absl::StatusOr<std::vector<HighFrequencySensorReading>>
ResampleHighFrequencySensorsReadings(
const HighFrequencySensorsReadings& readings, int sampling_interval_ms) {
std::vector<HighFrequencySensorReading> resampled_readings;
const auto& all_readings = readings.timestamped_readings();
if (all_readings.empty()) {
return absl::InvalidArgumentError("No readings found.");
}
// Convert sampling interval from milliseconds to nanoseconds
const int64_t sampling_interval_ns = sampling_interval_ms * 1000000;
// Store the timestamp of the first reading to normalize bin calculations
const int64_t first_reading_timestamp_ns =
all_readings.begin()->timestamp_ns();
// Map to store: bin_start_timestamp_ns -> {HighFrequencySensorReading,
// absolute_diff_to_midpoint_ns} btree_map automatically keeps keys sorted,
// ensuring output is chronological.
absl::btree_map<int64_t, std::pair<HighFrequencySensorReading, int64_t>>
best_reading_for_bin;
for (const auto& current_reading : all_readings) {
int64_t current_timestamp_ns = current_reading.timestamp_ns();
// Calculate the start timestamp of the bin this reading belongs to.
// We normalize by `first_reading_timestamp_ns` to ensure bins start
// relative to the earliest data point, preventing large initial empty bins
// if startTime != 0.
int64_t relative_time_ns =
current_timestamp_ns - first_reading_timestamp_ns;
int64_t bin_index = static_cast<int64_t>(
std::floor((relative_time_ns) / sampling_interval_ns));
int64_t bin_start_timestamp_ns =
first_reading_timestamp_ns + bin_index * sampling_interval_ns;
// Calculate the midpoint of this bin
int64_t bin_midpoint_ns = bin_start_timestamp_ns + sampling_interval_ns / 2;
// Calculate the absolute difference from the midpoint
int64_t diff_from_midpoint_ns =
std::abs(current_timestamp_ns - bin_midpoint_ns);
// Check if this bin already has a best reading, or if this reading is
// closer
auto it = best_reading_for_bin.find(bin_start_timestamp_ns);
if (it == best_reading_for_bin.end() ||
diff_from_midpoint_ns < it->second.second) {
// This is the first reading for this bin, or it's closer than the
// previous best
best_reading_for_bin[bin_start_timestamp_ns] = {current_reading,
diff_from_midpoint_ns};
}
}
// Collect the selected readings, ordered by bin start time (due to std::map's
// nature)
for (const auto& pair_entry : best_reading_for_bin) {
resampled_readings.push_back(pair_entry.second.first);
}
return resampled_readings;
}
absl::StatusOr<Payload> ResampleReadings(const Payload& raw_data,
int sampling_interval_ms) {
Payload resampled_data;
// Resample high frequency sensors readings.
if (raw_data.has_high_frequency_sensors_readings_batch()) {
for (const auto& sensor : raw_data.high_frequency_sensors_readings_batch()
.high_frequency_sensors()) {
ECCLESIA_ASSIGN_OR_RETURN(
std::vector<HighFrequencySensorReading> resampled_readings,
ResampleHighFrequencySensorsReadings(sensor, sampling_interval_ms));
HighFrequencySensorsReadings* new_sensor_with_resampled_readings =
resampled_data.mutable_high_frequency_sensors_readings_batch()
->add_high_frequency_sensors();
*new_sensor_with_resampled_readings->mutable_sensor_identifier() =
sensor.sensor_identifier();
*new_sensor_with_resampled_readings->mutable_timestamped_readings() = {
resampled_readings.begin(), resampled_readings.end()};
}
}
// Other payload types can be resampled here.
// For now, we only support high frequency sensors readings.
return resampled_data;
}
} // namespace
// ResourceMonitor implementation.
absl::Status ResourceMonitor::AddSubscriber(
const std::shared_ptr<ActiveSubscription>& subscription) {
LOG(WARNING) << "Adding subscriber " << subscription->GetId() << " to "
<< identifier_;
{
absl::MutexLock lock(subscribers_mutex_);
subscribers_.emplace(subscription->GetId(), subscription);
lowest_sampling_interval_subscriptions_.insert(
subscription->GetParams().sampling_interval_ms());
// Calculate the batch size for the data source.
int current_batch_size = subscription->GetParams().export_interval_ms() /
subscription->GetParams().sampling_interval_ms();
max_batch_sizes_.insert(current_batch_size);
LOG(WARNING) << "Max batch size for " << identifier_ << " is "
<< current_batch_size;
}
ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource());
return absl::OkStatus();
}
absl::Status ResourceMonitor::RemoveSubscriber(const SubscriptionId& id) {
int sampling_interval_ms_to_remove;
{
absl::MutexLock lock(subscribers_mutex_);
LOG(WARNING) << "Removing subscriber " << id << " from " << identifier_
<< " with sampling interval " << current_sampling_interval_ms_;
auto it = subscribers_.find(id);
if (it == subscribers_.end()) {
return absl::InvalidArgumentError(
absl::StrCat("Subscriber ", id, " not found."));
}
// Get the sampling interval for the subscription.
sampling_interval_ms_to_remove =
it->second->GetParams().sampling_interval_ms();
// Get the batch size for the subscription.
int current_batch_size = it->second->GetParams().export_interval_ms() /
it->second->GetParams().sampling_interval_ms();
// Remove the subscription from the set of subscribers.
subscribers_.erase(it);
// Remove the sampling interval from the set of lowest sampling intervals.
lowest_sampling_interval_subscriptions_.erase(
lowest_sampling_interval_subscriptions_.find(
sampling_interval_ms_to_remove));
// Remove the batch size from the set of max batch sizes for the data
// source.
max_batch_sizes_.erase(max_batch_sizes_.find(current_batch_size));
}
ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource());
return absl::OkStatus();
}
nlohmann::json ResourceMonitor::ToJson() const {
nlohmann::json json;
json["identifier"] = absl::StrCat(identifier_);
json["current_sampling_interval_ms"] = current_sampling_interval_ms_.load();
json["current_max_batch_size"] = current_max_batch_size_.load();
{
absl::MutexLock lock(subscribers_mutex_);
json["num_subscribers"] = subscribers_.size();
nlohmann::json subscribers = nlohmann::json::array();
for (const auto& [id, sub] : subscribers_) {
subscribers.push_back(id);
}
json["subscribers"] = subscribers;
json["lowest_sampling_interval_subscriptions"] = nlohmann::json::array();
for (const auto& sampling_interval_ms :
lowest_sampling_interval_subscriptions_) {
json["lowest_sampling_interval_subscriptions"].push_back(
sampling_interval_ms);
}
json["max_batch_sizes"] = nlohmann::json::array();
for (const auto& max_batch_size : max_batch_sizes_) {
json["max_batch_sizes"].push_back(max_batch_size);
}
}
return json;
}
absl::Status ResourceMonitor::ConfigureDataSource() {
absl::MutexLock lock(subscribers_mutex_);
int lowest_sampling_interval_ms =
lowest_sampling_interval_subscriptions_.empty()
? -1
: *lowest_sampling_interval_subscriptions_.begin();
// Update the max batch size for the data source.
int max_batch_size =
max_batch_sizes_.empty() ? -1 : *max_batch_sizes_.begin();
bool set_max_batch_size =
max_batch_size != current_max_batch_size_ && max_batch_size != -1;
bool set_sampling_interval =
lowest_sampling_interval_ms != current_sampling_interval_ms_ &&
lowest_sampling_interval_ms != -1;
bool reset_max_batch_size = max_batch_size == -1;
bool reset_sampling_interval = lowest_sampling_interval_ms == -1;
if (set_max_batch_size) {
LOG(WARNING) << "Setting max batch size for " << identifier_ << " to "
<< max_batch_size;
ECCLESIA_RETURN_IF_ERROR(
data_source_->ConfigureBatchSize(identifier_, max_batch_size));
} else if (reset_max_batch_size) {
ECCLESIA_RETURN_IF_ERROR(
data_source_->ResetBatchSizeToDefault(identifier_));
}
if (set_sampling_interval) {
LOG(WARNING) << "Setting effective sampling interval for " << identifier_
<< " to " << lowest_sampling_interval_ms;
ECCLESIA_RETURN_IF_ERROR(data_source_->ConfigureSamplingInterval(
identifier_, lowest_sampling_interval_ms));
} else if (reset_sampling_interval) {
ECCLESIA_RETURN_IF_ERROR(
data_source_->ResetSamplingIntervalToDefault(identifier_));
}
current_sampling_interval_ms_ = lowest_sampling_interval_ms;
current_max_batch_size_ = max_batch_size;
return absl::OkStatus();
}
SubscriptionManagerImpl::SubscriptionManagerImpl(
std::unique_ptr<DataSource> data_source)
: next_subscription_id_(0),
task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()),
data_source_(std::move(data_source)) {}
SubscriptionManagerImpl::SubscriptionManagerImpl(
std::unique_ptr<DataSource> data_source, FeedClientInterface* feed_client,
const Options& options,
const platforms_syshealth::collection::feed::NodeEntityId& node_entity_id)
: next_subscription_id_(0),
task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()),
data_source_(std::move(data_source)),
feed_client_(feed_client),
get_policies_interval_(options.get_policies_interval),
endpoint_type_(options.endpoint_type),
node_entity_id_(node_entity_id) {
if (feed_client_ != nullptr) {
get_policies_task_id_ = task_scheduler_->ScheduleAsync(
[this](absl::AnyInvocable<void()> on_done) {
PullAgentPoliciesAndUpdateSubscriptions();
on_done();
},
get_policies_interval_);
}
}
SubscriptionManagerImpl::~SubscriptionManagerImpl() {
if (get_policies_task_id_ != -1) {
task_scheduler_->Cancel(get_policies_task_id_);
}
task_scheduler_->Stop();
}
void SubscriptionManagerImpl::PullAgentPoliciesAndUpdateSubscriptions() {
if (feed_client_ == nullptr) {
return;
}
LOG(INFO) << "Getting policies from agent config service";
GetPoliciesRequest request;
request.mutable_endpoint_identifier()->set_endpoint_type(endpoint_type_);
*request.mutable_entity_id()->mutable_node_entity() = node_entity_id_;
absl::StatusOr<GetPoliciesResponse> response =
feed_client_->GetPolicies(request);
if (!response.ok()) {
LOG(ERROR) << "Failed to get policies: " << response.status();
return;
}
LOG(INFO) << "Received " << response->subscription_policies_size()
<< " policies";
absl::flat_hash_map<
std::string,
const platforms_syshealth::collection::feed::ClientSubscriptionPolicy*>
response_policies;
for (const auto& policy : response->subscription_policies()) {
if (policy.has_hft_subscription()) {
response_policies[policy.policy_id()] = &policy;
}
}
std::vector<platforms_syshealth::collection::feed::ClientSubscriptionPolicy>
policies_to_add;
std::vector<std::pair<std::string, SubscriptionId>> sub_ids_to_remove;
{
absl::MutexLock lock(&active_subscriptions_mutex_);
for (const auto& [policy_id, policy] : response_policies) {
if (!policy_to_subscription_ids_.contains(policy_id)) {
policies_to_add.push_back(*policy);
}
}
for (const auto& [policy_id, sub_id] : policy_to_subscription_ids_) {
if (!response_policies.contains(policy_id)) {
sub_ids_to_remove.push_back({policy_id, sub_id});
} else if (absl::StrCat(response_policies.at(policy_id)
->hft_subscription()
.subscription_params()) !=
absl::StrCat(active_subscriptions_.at(sub_id)->GetParams())) {
LOG(INFO) << "Policy " << policy_id
<< " has changed, updating subscription";
sub_ids_to_remove.push_back({policy_id, sub_id});
policies_to_add.push_back(*response_policies.at(policy_id));
}
}
}
for (const auto& policy : policies_to_add) {
LOG(INFO) << "Adding subscription for policy " << policy.policy_id();
absl::StatusOr<SubscriptionId> sub_id = AddSubscription(
policy.hft_subscription().subscription_params(), nullptr);
if (sub_id.ok()) {
absl::MutexLock lock(&active_subscriptions_mutex_);
// If subscription already ended, it might have been removed from
// active_subscriptions_, in which case we don't add it to
// policy_to_subscription_ids_.
if (active_subscriptions_.contains(*sub_id)) {
policy_to_subscription_ids_[policy.policy_id()] = *sub_id;
subscription_to_policy_ids_[*sub_id] = policy.policy_id();
}
} else {
LOG(ERROR) << "Failed to add subscription for policy "
<< policy.policy_id() << ": " << sub_id.status();
}
}
for (const auto& [policy_id, sub_id] : sub_ids_to_remove) {
LOG(INFO) << "Removing subscription for policy " << policy_id;
absl::Status status = Unsubscribe(sub_id);
if (!status.ok() && !absl::IsNotFound(status)) {
LOG(ERROR) << "Failed to unsubscribe policy " << policy_id
<< " with subscription id " << sub_id << ": " << status;
}
}
}
// ActiveSubscription implementation.
void ActiveSubscription::DeliverData(Payload&& raw_data) {
if (batches_remaining_ == 0) {
return;
}
// Deliver the data to the client.
on_data_callback_(std::move(raw_data));
// If batches_remaining_ is -1, the subscription is indefinite and will be
// ended when the client unsubscribes.
if (batches_remaining_ < 0) {
return;
}
// Subscription is limited to a fixed number of batches.
// If the number of batches is reached, the subscription will be ended.
--batches_remaining_;
DLOG(INFO) << "Subscription " << id_ << " delivered batch. "
<< batches_remaining_ << " batches remaining.";
if (batches_remaining_ == 0) {
on_subscription_ended_callback_(id_, absl::OkStatus());
}
}
// ActiveSubscription implementation.
ActiveSubscription::ActiveSubscription(
const SubscriptionId& id, const milotic_hft::SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback,
absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
on_subscription_ended,
milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source)
: id_(id),
params_(params),
on_data_callback_(std::move(on_data_callback)),
on_subscription_ended_callback_(std::move(on_subscription_ended)),
batches_remaining_(params.num_batches() == 0 ? -1 : params.num_batches()),
task_scheduler_(task_scheduler),
data_source_(data_source) {
LOG(WARNING) << "ActiveSubscription " << id_
<< " created. num_batches: " << params_.num_batches();
}
// ActiveSubscription implementation.
void ActiveSubscription::Begin() {
// Schedule all the tasks for this subscription.
// The task calls the data source to collect the data and then goes over
// the data and downsample to the desired sampling interval in the
// subscription params.
auto task = [weak_self = weak_from_this()](
absl::AnyInvocable<void()> on_done) mutable {
std::shared_ptr<ActiveSubscription> self = weak_self.lock();
if (self == nullptr) {
on_done();
return;
}
// Collect the data for all the identifiers in the subscription params.
Payload consolidated_data;
for (const auto& identifier : self->params_.identifiers()) {
absl::Time last_sampled_time;
{
absl::MutexLock lock(self->identifier_to_last_sampled_time_ms_mutex_);
last_sampled_time = self->identifier_to_last_sampled_time_[identifier];
}
absl::StatusOr<Payload> data =
self->data_source_->Collect(identifier, last_sampled_time);
if (!data.ok() || data->ByteSizeLong() == 0) {
LOG(ERROR) << "Failed to collect data for subscription " << self->id_
<< ": " << data.status();
// In this case, there is no valid payload created. Create a placeholder
// payload with STATUS_STALE and status message to be able to deliver.
Payload empty_payload;
HighFrequencySensorsReadings* sensor_readings =
empty_payload.mutable_high_frequency_sensors_readings_batch()
->mutable_high_frequency_sensors()
->Add();
*sensor_readings->mutable_sensor_identifier() =
identifier.sensor_identifier();
milotic_hft::State state;
state.set_status(milotic_hft::STATUS_MISSING);
state.set_status_message(
absl::StrCat("Failed to collect data for identifier ", identifier,
" with status: ", data.status()));
*sensor_readings->mutable_state() = state;
consolidated_data.MergeFrom(empty_payload);
continue;
}
// Resample the data to the desired sampling interval.
absl::StatusOr<Payload> resampled_data = *data;
if (!data->has_high_frequency_sensors_readings_batch() ||
data->high_frequency_sensors_readings_batch()
.configured_sampling_interval_ms() !=
self->params_.sampling_interval_ms()) {
resampled_data =
ResampleReadings(*data, self->params_.sampling_interval_ms());
if (!resampled_data.ok()) {
LOG(ERROR) << "Failed to resample data for subscription " << self->id_
<< ": " << resampled_data.status();
consolidated_data.MergeFrom(*data);
continue;
}
}
// Get last sampled time for the identifier from the resampled data.
absl::StatusOr<absl::Time> updated_sample_time =
GetLastSampledTime(*resampled_data);
if (!updated_sample_time.ok()) {
LOG(ERROR) << "Failed to get last sampled time for subscription "
<< self->id_ << ": " << updated_sample_time.status();
consolidated_data.MergeFrom(*resampled_data);
continue;
}
last_sampled_time = *updated_sample_time;
// Merge the data into the consolidated data as there may be multiple
// identifiers in the subscription params.
consolidated_data.MergeFrom(*resampled_data);
{
absl::MutexLock lock(self->identifier_to_last_sampled_time_ms_mutex_);
self->identifier_to_last_sampled_time_[identifier] = last_sampled_time;
}
}
self->DeliverData(std::move(consolidated_data));
on_done();
};
task_id_ = task_scheduler_->ScheduleAsync(
task, absl::Milliseconds(params_.export_interval_ms()));
}
nlohmann::json ActiveSubscription::ToJson() const {
nlohmann::json json;
json["id"] = id_;
json["params"] = absl::StrCat(params_);
json["batches_remaining"] = batches_remaining_.load();
json["data_source"] = data_source_->GetName();
absl::MutexLock lock(identifier_to_last_sampled_time_ms_mutex_);
for (const auto& [identifier, last_sampled_time] :
identifier_to_last_sampled_time_) {
nlohmann::json entry;
entry["identifier"] = absl::StrCat(identifier);
entry["last_sampled_time"] = absl::FormatTime(last_sampled_time);
json["identifier_to_last_sampled_time"].push_back(entry);
}
return json;
}
void SubscriptionManagerImpl::OnSubscriptionEnd(
const SubscriptionId& subscription_id, const absl::Status& status) {
std::shared_ptr<ActiveSubscription> sub_to_remove;
{
absl::MutexLock lock(active_subscriptions_mutex_);
auto it = active_subscriptions_.find(subscription_id);
if (it == active_subscriptions_.end()) {
return;
}
sub_to_remove = it->second;
active_subscriptions_.erase(it);
// If subscription is from policy, remove from policy_to_subscription_ids_
// and subscription_to_policy_ids_.
auto policy_it = subscription_to_policy_ids_.find(subscription_id);
if (policy_it != subscription_to_policy_ids_.end()) {
policy_to_subscription_ids_.erase(policy_it->second);
subscription_to_policy_ids_.erase(policy_it);
}
}
LOG(WARNING) << "Subscription " << subscription_id << " ended with status "
<< status;
for (const auto& identifier : sub_to_remove->GetParams().identifiers()) {
std::shared_ptr<ResourceMonitor> resource_monitor;
{
absl::MutexLock lock(resource_monitors_mutex_);
auto it = resource_monitors_.find(identifier);
if (it == resource_monitors_.end()) {
continue;
}
resource_monitor = it->second;
}
absl::Status remove_status =
resource_monitor->RemoveSubscriber(subscription_id);
if (!remove_status.ok()) {
LOG(ERROR) << "Failed to remove subscriber " << subscription_id
<< " from resource " << identifier << ": " << status;
}
}
}
absl::StatusOr<SubscriptionManager::SubscriptionId>
SubscriptionManagerImpl::AddSubscription(
const milotic_hft::SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback) {
LOG(WARNING) << "Add Subscription with params: " << params;
if (feed_client_ == nullptr && on_data_callback == nullptr) {
return absl::InvalidArgumentError("on_data_callback cannot be null.");
}
// Copy the subscription params to append the identifiers to subscribe to.
milotic_hft::SubscriptionParams internal_params = params;
if (params.subscription_policy().configuration_type() ==
milotic_hft::SubscriptionPolicy::
CONFIGURATION_TYPE_CONFIGURE_ALL_RESOURCES) {
// Get all the identifiers from the data source.
ECCLESIA_ASSIGN_OR_RETURN(
std::vector<Identifier> identifiers,
data_source_->GetIdentifiersForResourceType(
{params.subscription_policy().resource_type()}));
internal_params.mutable_identifiers()->Add(identifiers.begin(),
identifiers.end());
}
if (internal_params.identifiers().empty()) {
return absl::InvalidArgumentError(
"Subscription parameters must include at least one identifier or "
"specify CONFIGURATION_TYPE_ALL_RESOURCES.");
}
// Sampling interval must be greater than 0.
if (internal_params.sampling_interval_ms() <= 0) {
return absl::InvalidArgumentError(
"Sampling interval must be greater than 0.");
}
SubscriptionId sub_id = GenerateSubscriptionId();
absl::AnyInvocable<void(Payload&&)> callback;
callback = [this,
cb = std::move(on_data_callback)](Payload&& payload) mutable {
// If an explicit callback is provided in AddSubscription, we know it's a
// manual subscription. So for that subscription, there will be no push done
// via feed client and the callback provided will be called.
if (cb != nullptr) {
cb(std::move(payload));
return;
}
if (feed_client_ != nullptr) {
platforms_syshealth::collection::feed::WriteMetricsRequest request;
*request.add_metric_sets()->mutable_hft_payload()->add_payload() =
std::move(payload);
auto response = feed_client_->WriteMetrics(request);
if (!response.ok()) {
LOG(ERROR) << "Failed to write metrics: " << response.status();
}
}
};
auto active_sub = std::make_shared<ActiveSubscription>(
sub_id, internal_params, std::move(callback),
[this](const SubscriptionId& id, const absl::Status& status) {
this->OnSubscriptionEnd(id, status);
},
task_scheduler_.get(), data_source_.get());
// Create resource monitors for all the identifiers in the subscription
// params.
for (const auto& identifier : internal_params.identifiers()) {
std::shared_ptr<ResourceMonitor> resource_monitor;
{
absl::MutexLock lock(resource_monitors_mutex_);
auto it = resource_monitors_.find(identifier);
if (it == resource_monitors_.end()) {
resource_monitor =
std::make_shared<ResourceMonitor>(identifier, data_source_.get());
resource_monitors_[identifier] = resource_monitor;
} else {
resource_monitor = it->second;
}
}
ECCLESIA_RETURN_IF_ERROR(resource_monitor->AddSubscriber(active_sub));
}
// Add the subscription to the active subscriptions map.
{
absl::MutexLock lock(active_subscriptions_mutex_);
active_subscriptions_[sub_id] = active_sub;
}
active_sub->Begin();
return sub_id;
}
absl::Status SubscriptionManagerImpl::Unsubscribe(
const SubscriptionId& sub_id) {
{
absl::MutexLock lock(active_subscriptions_mutex_);
auto it = active_subscriptions_.find(sub_id);
if (it == active_subscriptions_.end()) {
return absl::NotFoundError(
absl::StrCat("Subscription ", sub_id, " not found for unsubscribe."));
}
LOG(WARNING) << "Unsubscribing " << sub_id;
}
OnSubscriptionEnd(sub_id, absl::CancelledError("Client unsubscribed."));
return absl::OkStatus();
}
SubscriptionManagerImpl::SubscriptionId
SubscriptionManagerImpl::GenerateSubscriptionId() {
return "sub_" + std::to_string(next_subscription_id_++);
}
nlohmann::json SubscriptionManagerImpl::ToJson() const {
nlohmann::json json;
json["next_subscription_id"] = next_subscription_id_.load();
json["task_scheduler"] = task_scheduler_->ToJson();
json["data_source"] = data_source_->GetName();
{
absl::MutexLock lock(active_subscriptions_mutex_);
for (const auto& [id, sub] : active_subscriptions_) {
json["active_subscriptions"].push_back(sub->ToJson());
}
}
json["resource_monitors"] = nlohmann::json::array();
{
absl::MutexLock lock(resource_monitors_mutex_);
for (const auto& [identifier, resource_monitor] : resource_monitors_) {
json["resource_monitors"].push_back(resource_monitor->ToJson());
}
}
return json;
}
} // namespace milotic_hft