| #include "tlbmc/subscription/manager_impl.h" |
| |
| #include <sys/param.h> |
| #include <unistd.h> |
| |
| #include <atomic> |
| #include <cmath> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "tlbmc/feed_payload.pb.h" |
| #include "tlbmc/collection_entity.pb.h" |
| #include "tlbmc/feed_client_interface.h" |
| #include "tlbmc/hft_subscription.pb.h" |
| #include "tlbmc/client_subscription.pb.h" |
| #include "tlbmc/service_messages.pb.h" |
| #include "one/offline_node_entities.pb.h" |
| #include "one/resolved_entities.pb.h" |
| #include "absl/container/btree_map.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/container/flat_hash_set.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "g3/macros.h" |
| #include "nlohmann/json_fwd.hpp" |
| #include "tlbmc/adapter/data_source.h" |
| #include "tlbmc/subscription/manager.h" |
| #include "identifier.pb.h" |
| #include "payload.pb.h" |
| #include "sensor_payload.pb.h" |
| #include "subscription_params.pb.h" |
| #include "resource.pb.h" |
| #include "tlbmc/scheduler/scheduler.h" |
| |
| namespace milotic_hft { |
| |
| namespace { |
| |
| using ::platforms_syshealth::collection::feed::FeedClientInterface; |
| using ::platforms_syshealth::collection::feed::GetPoliciesRequest; |
| using ::platforms_syshealth::collection::feed::GetPoliciesResponse; |
| |
| absl::StatusOr<absl::Time> GetLastSampledTime(const Payload& data) { |
| if (data.has_high_frequency_sensors_readings_batch()) { |
| const auto& sensors = |
| data.high_frequency_sensors_readings_batch().high_frequency_sensors(); |
| if (sensors.empty()) { |
| return absl::NotFoundError("No sensors found in the batch."); |
| } |
| const auto& readings = sensors.rbegin()->timestamped_readings(); |
| if (readings.empty()) { |
| return absl::NotFoundError("No readings found."); |
| } |
| |
| // Get the last sampled time from the last sensor. |
| return absl::FromUnixNanos(readings.rbegin()->timestamp_ns()); |
| } |
| return absl::InvalidArgumentError("Unsupported payload type."); |
| } |
| |
| absl::StatusOr<std::vector<HighFrequencySensorReading>> |
| ResampleHighFrequencySensorsReadings( |
| const HighFrequencySensorsReadings& readings, int sampling_interval_ms) { |
| std::vector<HighFrequencySensorReading> resampled_readings; |
| const auto& all_readings = readings.timestamped_readings(); |
| |
| if (all_readings.empty()) { |
| return absl::InvalidArgumentError("No readings found."); |
| } |
| |
| // Convert sampling interval from milliseconds to nanoseconds |
| const int64_t sampling_interval_ns = sampling_interval_ms * 1000000; |
| |
| // Store the timestamp of the first reading to normalize bin calculations |
| const int64_t first_reading_timestamp_ns = |
| all_readings.begin()->timestamp_ns(); |
| |
| // Map to store: bin_start_timestamp_ns -> {HighFrequencySensorReading, |
| // absolute_diff_to_midpoint_ns} btree_map automatically keeps keys sorted, |
| // ensuring output is chronological. |
| absl::btree_map<int64_t, std::pair<HighFrequencySensorReading, int64_t>> |
| best_reading_for_bin; |
| |
| for (const auto& current_reading : all_readings) { |
| int64_t current_timestamp_ns = current_reading.timestamp_ns(); |
| |
| // Calculate the start timestamp of the bin this reading belongs to. |
| // We normalize by `first_reading_timestamp_ns` to ensure bins start |
| // relative to the earliest data point, preventing large initial empty bins |
| // if startTime != 0. |
| int64_t relative_time_ns = |
| current_timestamp_ns - first_reading_timestamp_ns; |
| int64_t bin_index = static_cast<int64_t>( |
| std::floor((relative_time_ns) / sampling_interval_ns)); |
| int64_t bin_start_timestamp_ns = |
| first_reading_timestamp_ns + bin_index * sampling_interval_ns; |
| |
| // Calculate the midpoint of this bin |
| int64_t bin_midpoint_ns = bin_start_timestamp_ns + sampling_interval_ns / 2; |
| |
| // Calculate the absolute difference from the midpoint |
| int64_t diff_from_midpoint_ns = |
| std::abs(current_timestamp_ns - bin_midpoint_ns); |
| |
| // Check if this bin already has a best reading, or if this reading is |
| // closer |
| auto it = best_reading_for_bin.find(bin_start_timestamp_ns); |
| if (it == best_reading_for_bin.end() || |
| diff_from_midpoint_ns < it->second.second) { |
| // This is the first reading for this bin, or it's closer than the |
| // previous best |
| best_reading_for_bin[bin_start_timestamp_ns] = {current_reading, |
| diff_from_midpoint_ns}; |
| } |
| } |
| |
| // Collect the selected readings, ordered by bin start time (due to std::map's |
| // nature) |
| for (const auto& pair_entry : best_reading_for_bin) { |
| resampled_readings.push_back(pair_entry.second.first); |
| } |
| |
| return resampled_readings; |
| } |
| |
| absl::StatusOr<Payload> ResampleReadings(const Payload& raw_data, |
| int sampling_interval_ms) { |
| Payload resampled_data; |
| |
| // Resample high frequency sensors readings. |
| if (raw_data.has_high_frequency_sensors_readings_batch()) { |
| for (const auto& sensor : raw_data.high_frequency_sensors_readings_batch() |
| .high_frequency_sensors()) { |
| ECCLESIA_ASSIGN_OR_RETURN( |
| std::vector<HighFrequencySensorReading> resampled_readings, |
| ResampleHighFrequencySensorsReadings(sensor, sampling_interval_ms)); |
| HighFrequencySensorsReadings* new_sensor_with_resampled_readings = |
| resampled_data.mutable_high_frequency_sensors_readings_batch() |
| ->add_high_frequency_sensors(); |
| *new_sensor_with_resampled_readings->mutable_sensor_identifier() = |
| sensor.sensor_identifier(); |
| *new_sensor_with_resampled_readings->mutable_timestamped_readings() = { |
| resampled_readings.begin(), resampled_readings.end()}; |
| } |
| } |
| // Other payload types can be resampled here. |
| // For now, we only support high frequency sensors readings. |
| |
| return resampled_data; |
| } |
| |
| } // namespace |
| |
| // ResourceMonitor implementation. |
| absl::Status ResourceMonitor::AddSubscriber( |
| const std::shared_ptr<ActiveSubscription>& subscription) { |
| LOG(WARNING) << "Adding subscriber " << subscription->GetId() << " to " |
| << identifier_; |
| { |
| absl::MutexLock lock(subscribers_mutex_); |
| subscribers_.emplace(subscription->GetId(), subscription); |
| lowest_sampling_interval_subscriptions_.insert( |
| subscription->GetParams().sampling_interval_ms()); |
| // Calculate the batch size for the data source. |
| int current_batch_size = subscription->GetParams().export_interval_ms() / |
| subscription->GetParams().sampling_interval_ms(); |
| max_batch_sizes_.insert(current_batch_size); |
| LOG(WARNING) << "Max batch size for " << identifier_ << " is " |
| << current_batch_size; |
| } |
| ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource()); |
| return absl::OkStatus(); |
| } |
| |
| absl::Status ResourceMonitor::RemoveSubscriber(const SubscriptionId& id) { |
| int sampling_interval_ms_to_remove; |
| { |
| absl::MutexLock lock(subscribers_mutex_); |
| LOG(WARNING) << "Removing subscriber " << id << " from " << identifier_ |
| << " with sampling interval " << current_sampling_interval_ms_; |
| auto it = subscribers_.find(id); |
| if (it == subscribers_.end()) { |
| return absl::InvalidArgumentError( |
| absl::StrCat("Subscriber ", id, " not found.")); |
| } |
| // Get the sampling interval for the subscription. |
| sampling_interval_ms_to_remove = |
| it->second->GetParams().sampling_interval_ms(); |
| // Get the batch size for the subscription. |
| int current_batch_size = it->second->GetParams().export_interval_ms() / |
| it->second->GetParams().sampling_interval_ms(); |
| // Remove the subscription from the set of subscribers. |
| subscribers_.erase(it); |
| // Remove the sampling interval from the set of lowest sampling intervals. |
| lowest_sampling_interval_subscriptions_.erase( |
| lowest_sampling_interval_subscriptions_.find( |
| sampling_interval_ms_to_remove)); |
| // Remove the batch size from the set of max batch sizes for the data |
| // source. |
| max_batch_sizes_.erase(max_batch_sizes_.find(current_batch_size)); |
| } |
| ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource()); |
| return absl::OkStatus(); |
| } |
| |
| nlohmann::json ResourceMonitor::ToJson() const { |
| nlohmann::json json; |
| json["identifier"] = absl::StrCat(identifier_); |
| json["current_sampling_interval_ms"] = current_sampling_interval_ms_.load(); |
| json["current_max_batch_size"] = current_max_batch_size_.load(); |
| { |
| absl::MutexLock lock(subscribers_mutex_); |
| json["num_subscribers"] = subscribers_.size(); |
| nlohmann::json subscribers = nlohmann::json::array(); |
| for (const auto& [id, sub] : subscribers_) { |
| subscribers.push_back(id); |
| } |
| json["subscribers"] = subscribers; |
| json["lowest_sampling_interval_subscriptions"] = nlohmann::json::array(); |
| for (const auto& sampling_interval_ms : |
| lowest_sampling_interval_subscriptions_) { |
| json["lowest_sampling_interval_subscriptions"].push_back( |
| sampling_interval_ms); |
| } |
| json["max_batch_sizes"] = nlohmann::json::array(); |
| for (const auto& max_batch_size : max_batch_sizes_) { |
| json["max_batch_sizes"].push_back(max_batch_size); |
| } |
| } |
| return json; |
| } |
| |
| absl::Status ResourceMonitor::ConfigureDataSource() { |
| absl::MutexLock lock(subscribers_mutex_); |
| int lowest_sampling_interval_ms = |
| lowest_sampling_interval_subscriptions_.empty() |
| ? -1 |
| : *lowest_sampling_interval_subscriptions_.begin(); |
| // Update the max batch size for the data source. |
| int max_batch_size = |
| max_batch_sizes_.empty() ? -1 : *max_batch_sizes_.begin(); |
| |
| bool set_max_batch_size = |
| max_batch_size != current_max_batch_size_ && max_batch_size != -1; |
| bool set_sampling_interval = |
| lowest_sampling_interval_ms != current_sampling_interval_ms_ && |
| lowest_sampling_interval_ms != -1; |
| bool reset_max_batch_size = max_batch_size == -1; |
| bool reset_sampling_interval = lowest_sampling_interval_ms == -1; |
| |
| if (set_max_batch_size) { |
| LOG(WARNING) << "Setting max batch size for " << identifier_ << " to " |
| << max_batch_size; |
| ECCLESIA_RETURN_IF_ERROR( |
| data_source_->ConfigureBatchSize(identifier_, max_batch_size)); |
| } else if (reset_max_batch_size) { |
| ECCLESIA_RETURN_IF_ERROR( |
| data_source_->ResetBatchSizeToDefault(identifier_)); |
| } |
| |
| if (set_sampling_interval) { |
| LOG(WARNING) << "Setting effective sampling interval for " << identifier_ |
| << " to " << lowest_sampling_interval_ms; |
| ECCLESIA_RETURN_IF_ERROR(data_source_->ConfigureSamplingInterval( |
| identifier_, lowest_sampling_interval_ms)); |
| } else if (reset_sampling_interval) { |
| ECCLESIA_RETURN_IF_ERROR( |
| data_source_->ResetSamplingIntervalToDefault(identifier_)); |
| } |
| current_sampling_interval_ms_ = lowest_sampling_interval_ms; |
| current_max_batch_size_ = max_batch_size; |
| return absl::OkStatus(); |
| } |
| |
| SubscriptionManagerImpl::SubscriptionManagerImpl( |
| std::unique_ptr<DataSource> data_source) |
| : next_subscription_id_(0), |
| task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()), |
| data_source_(std::move(data_source)) {} |
| |
| SubscriptionManagerImpl::SubscriptionManagerImpl( |
| std::unique_ptr<DataSource> data_source, FeedClientInterface* feed_client, |
| const Options& options, |
| const platforms_syshealth::collection::feed::NodeEntityId& node_entity_id) |
| : next_subscription_id_(0), |
| task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()), |
| data_source_(std::move(data_source)), |
| feed_client_(feed_client), |
| get_policies_interval_(options.get_policies_interval), |
| endpoint_type_(options.endpoint_type), |
| node_entity_id_(node_entity_id) { |
| if (feed_client_ != nullptr) { |
| get_policies_task_id_ = task_scheduler_->ScheduleAsync( |
| [this](absl::AnyInvocable<void()> on_done) { |
| PullAgentPoliciesAndUpdateSubscriptions(); |
| on_done(); |
| }, |
| get_policies_interval_); |
| } |
| } |
| |
| SubscriptionManagerImpl::~SubscriptionManagerImpl() { |
| if (get_policies_task_id_ != -1) { |
| task_scheduler_->Cancel(get_policies_task_id_); |
| } |
| task_scheduler_->Stop(); |
| } |
| |
| void SubscriptionManagerImpl::PullAgentPoliciesAndUpdateSubscriptions() { |
| if (feed_client_ == nullptr) { |
| return; |
| } |
| LOG(INFO) << "Getting policies from agent config service"; |
| GetPoliciesRequest request; |
| request.mutable_endpoint_identifier()->set_endpoint_type(endpoint_type_); |
| *request.mutable_entity_id()->mutable_node_entity() = node_entity_id_; |
| absl::StatusOr<GetPoliciesResponse> response = |
| feed_client_->GetPolicies(request); |
| if (!response.ok()) { |
| LOG(ERROR) << "Failed to get policies: " << response.status(); |
| return; |
| } |
| LOG(INFO) << "Received " << response->subscription_policies_size() |
| << " policies"; |
| |
| absl::flat_hash_map< |
| std::string, |
| const platforms_syshealth::collection::feed::ClientSubscriptionPolicy*> |
| response_policies; |
| for (const auto& policy : response->subscription_policies()) { |
| if (policy.has_hft_subscription()) { |
| response_policies[policy.policy_id()] = &policy; |
| } |
| } |
| |
| std::vector<platforms_syshealth::collection::feed::ClientSubscriptionPolicy> |
| policies_to_add; |
| std::vector<std::pair<std::string, SubscriptionId>> sub_ids_to_remove; |
| |
| { |
| absl::MutexLock lock(&active_subscriptions_mutex_); |
| for (const auto& [policy_id, policy] : response_policies) { |
| if (!policy_to_subscription_ids_.contains(policy_id)) { |
| policies_to_add.push_back(*policy); |
| } |
| } |
| |
| for (const auto& [policy_id, sub_id] : policy_to_subscription_ids_) { |
| if (!response_policies.contains(policy_id)) { |
| sub_ids_to_remove.push_back({policy_id, sub_id}); |
| } else if (absl::StrCat(response_policies.at(policy_id) |
| ->hft_subscription() |
| .subscription_params()) != |
| absl::StrCat(active_subscriptions_.at(sub_id)->GetParams())) { |
| LOG(INFO) << "Policy " << policy_id |
| << " has changed, updating subscription"; |
| sub_ids_to_remove.push_back({policy_id, sub_id}); |
| policies_to_add.push_back(*response_policies.at(policy_id)); |
| } |
| } |
| } |
| |
| for (const auto& policy : policies_to_add) { |
| LOG(INFO) << "Adding subscription for policy " << policy.policy_id(); |
| absl::StatusOr<SubscriptionId> sub_id = AddSubscription( |
| policy.hft_subscription().subscription_params(), nullptr); |
| if (sub_id.ok()) { |
| absl::MutexLock lock(&active_subscriptions_mutex_); |
| // If subscription already ended, it might have been removed from |
| // active_subscriptions_, in which case we don't add it to |
| // policy_to_subscription_ids_. |
| if (active_subscriptions_.contains(*sub_id)) { |
| policy_to_subscription_ids_[policy.policy_id()] = *sub_id; |
| subscription_to_policy_ids_[*sub_id] = policy.policy_id(); |
| } |
| } else { |
| LOG(ERROR) << "Failed to add subscription for policy " |
| << policy.policy_id() << ": " << sub_id.status(); |
| } |
| } |
| |
| for (const auto& [policy_id, sub_id] : sub_ids_to_remove) { |
| LOG(INFO) << "Removing subscription for policy " << policy_id; |
| absl::Status status = Unsubscribe(sub_id); |
| if (!status.ok() && !absl::IsNotFound(status)) { |
| LOG(ERROR) << "Failed to unsubscribe policy " << policy_id |
| << " with subscription id " << sub_id << ": " << status; |
| } |
| } |
| } |
| |
| // ActiveSubscription implementation. |
| void ActiveSubscription::DeliverData(Payload&& raw_data) { |
| if (batches_remaining_ == 0) { |
| return; |
| } |
| |
| // Deliver the data to the client. |
| on_data_callback_(std::move(raw_data)); |
| |
| // If batches_remaining_ is -1, the subscription is indefinite and will be |
| // ended when the client unsubscribes. |
| if (batches_remaining_ < 0) { |
| return; |
| } |
| |
| // Subscription is limited to a fixed number of batches. |
| // If the number of batches is reached, the subscription will be ended. |
| --batches_remaining_; |
| DLOG(INFO) << "Subscription " << id_ << " delivered batch. " |
| << batches_remaining_ << " batches remaining."; |
| if (batches_remaining_ == 0) { |
| on_subscription_ended_callback_(id_, absl::OkStatus()); |
| } |
| } |
| |
| // ActiveSubscription implementation. |
| ActiveSubscription::ActiveSubscription( |
| const SubscriptionId& id, const milotic_hft::SubscriptionParams& params, |
| absl::AnyInvocable<void(Payload&&)> on_data_callback, |
| absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)> |
| on_subscription_ended, |
| milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source) |
| : id_(id), |
| params_(params), |
| on_data_callback_(std::move(on_data_callback)), |
| on_subscription_ended_callback_(std::move(on_subscription_ended)), |
| batches_remaining_(params.num_batches() == 0 ? -1 : params.num_batches()), |
| task_scheduler_(task_scheduler), |
| data_source_(data_source) { |
| LOG(WARNING) << "ActiveSubscription " << id_ |
| << " created. num_batches: " << params_.num_batches(); |
| } |
| |
| // ActiveSubscription implementation. |
| void ActiveSubscription::Begin() { |
| // Schedule all the tasks for this subscription. |
| // The task calls the data source to collect the data and then goes over |
| // the data and downsample to the desired sampling interval in the |
| // subscription params. |
| |
| auto task = [weak_self = weak_from_this()]( |
| absl::AnyInvocable<void()> on_done) mutable { |
| std::shared_ptr<ActiveSubscription> self = weak_self.lock(); |
| if (self == nullptr) { |
| on_done(); |
| return; |
| } |
| |
| // Collect the data for all the identifiers in the subscription params. |
| Payload consolidated_data; |
| for (const auto& identifier : self->params_.identifiers()) { |
| absl::Time last_sampled_time; |
| { |
| absl::MutexLock lock(self->identifier_to_last_sampled_time_ms_mutex_); |
| last_sampled_time = self->identifier_to_last_sampled_time_[identifier]; |
| } |
| absl::StatusOr<Payload> data = |
| self->data_source_->Collect(identifier, last_sampled_time); |
| if (!data.ok() || data->ByteSizeLong() == 0) { |
| LOG(ERROR) << "Failed to collect data for subscription " << self->id_ |
| << ": " << data.status(); |
| // In this case, there is no valid payload created. Create a placeholder |
| // payload with STATUS_STALE and status message to be able to deliver. |
| Payload empty_payload; |
| HighFrequencySensorsReadings* sensor_readings = |
| empty_payload.mutable_high_frequency_sensors_readings_batch() |
| ->mutable_high_frequency_sensors() |
| ->Add(); |
| |
| *sensor_readings->mutable_sensor_identifier() = |
| identifier.sensor_identifier(); |
| milotic_hft::State state; |
| state.set_status(milotic_hft::STATUS_MISSING); |
| state.set_status_message( |
| absl::StrCat("Failed to collect data for identifier ", identifier, |
| " with status: ", data.status())); |
| *sensor_readings->mutable_state() = state; |
| consolidated_data.MergeFrom(empty_payload); |
| continue; |
| } |
| |
| // Resample the data to the desired sampling interval. |
| absl::StatusOr<Payload> resampled_data = *data; |
| if (!data->has_high_frequency_sensors_readings_batch() || |
| data->high_frequency_sensors_readings_batch() |
| .configured_sampling_interval_ms() != |
| self->params_.sampling_interval_ms()) { |
| resampled_data = |
| ResampleReadings(*data, self->params_.sampling_interval_ms()); |
| if (!resampled_data.ok()) { |
| LOG(ERROR) << "Failed to resample data for subscription " << self->id_ |
| << ": " << resampled_data.status(); |
| consolidated_data.MergeFrom(*data); |
| continue; |
| } |
| } |
| |
| // Get last sampled time for the identifier from the resampled data. |
| absl::StatusOr<absl::Time> updated_sample_time = |
| GetLastSampledTime(*resampled_data); |
| if (!updated_sample_time.ok()) { |
| LOG(ERROR) << "Failed to get last sampled time for subscription " |
| << self->id_ << ": " << updated_sample_time.status(); |
| consolidated_data.MergeFrom(*resampled_data); |
| continue; |
| } |
| last_sampled_time = *updated_sample_time; |
| |
| // Merge the data into the consolidated data as there may be multiple |
| // identifiers in the subscription params. |
| consolidated_data.MergeFrom(*resampled_data); |
| { |
| absl::MutexLock lock(self->identifier_to_last_sampled_time_ms_mutex_); |
| self->identifier_to_last_sampled_time_[identifier] = last_sampled_time; |
| } |
| } |
| self->DeliverData(std::move(consolidated_data)); |
| on_done(); |
| }; |
| |
| task_id_ = task_scheduler_->ScheduleAsync( |
| task, absl::Milliseconds(params_.export_interval_ms())); |
| } |
| |
| nlohmann::json ActiveSubscription::ToJson() const { |
| nlohmann::json json; |
| json["id"] = id_; |
| json["params"] = absl::StrCat(params_); |
| json["batches_remaining"] = batches_remaining_.load(); |
| json["data_source"] = data_source_->GetName(); |
| absl::MutexLock lock(identifier_to_last_sampled_time_ms_mutex_); |
| for (const auto& [identifier, last_sampled_time] : |
| identifier_to_last_sampled_time_) { |
| nlohmann::json entry; |
| entry["identifier"] = absl::StrCat(identifier); |
| entry["last_sampled_time"] = absl::FormatTime(last_sampled_time); |
| json["identifier_to_last_sampled_time"].push_back(entry); |
| } |
| return json; |
| } |
| |
| void SubscriptionManagerImpl::OnSubscriptionEnd( |
| const SubscriptionId& subscription_id, const absl::Status& status) { |
| std::shared_ptr<ActiveSubscription> sub_to_remove; |
| { |
| absl::MutexLock lock(active_subscriptions_mutex_); |
| auto it = active_subscriptions_.find(subscription_id); |
| if (it == active_subscriptions_.end()) { |
| return; |
| } |
| sub_to_remove = it->second; |
| active_subscriptions_.erase(it); |
| |
| // If subscription is from policy, remove from policy_to_subscription_ids_ |
| // and subscription_to_policy_ids_. |
| auto policy_it = subscription_to_policy_ids_.find(subscription_id); |
| if (policy_it != subscription_to_policy_ids_.end()) { |
| policy_to_subscription_ids_.erase(policy_it->second); |
| subscription_to_policy_ids_.erase(policy_it); |
| } |
| } |
| LOG(WARNING) << "Subscription " << subscription_id << " ended with status " |
| << status; |
| for (const auto& identifier : sub_to_remove->GetParams().identifiers()) { |
| std::shared_ptr<ResourceMonitor> resource_monitor; |
| { |
| absl::MutexLock lock(resource_monitors_mutex_); |
| auto it = resource_monitors_.find(identifier); |
| if (it == resource_monitors_.end()) { |
| continue; |
| } |
| resource_monitor = it->second; |
| } |
| absl::Status remove_status = |
| resource_monitor->RemoveSubscriber(subscription_id); |
| if (!remove_status.ok()) { |
| LOG(ERROR) << "Failed to remove subscriber " << subscription_id |
| << " from resource " << identifier << ": " << status; |
| } |
| } |
| } |
| |
| absl::StatusOr<SubscriptionManager::SubscriptionId> |
| SubscriptionManagerImpl::AddSubscription( |
| const milotic_hft::SubscriptionParams& params, |
| absl::AnyInvocable<void(Payload&&)> on_data_callback) { |
| LOG(WARNING) << "Add Subscription with params: " << params; |
| if (feed_client_ == nullptr && on_data_callback == nullptr) { |
| return absl::InvalidArgumentError("on_data_callback cannot be null."); |
| } |
| |
| // Copy the subscription params to append the identifiers to subscribe to. |
| milotic_hft::SubscriptionParams internal_params = params; |
| if (params.subscription_policy().configuration_type() == |
| milotic_hft::SubscriptionPolicy:: |
| CONFIGURATION_TYPE_CONFIGURE_ALL_RESOURCES) { |
| // Get all the identifiers from the data source. |
| ECCLESIA_ASSIGN_OR_RETURN( |
| std::vector<Identifier> identifiers, |
| data_source_->GetIdentifiersForResourceType( |
| {params.subscription_policy().resource_type()})); |
| internal_params.mutable_identifiers()->Add(identifiers.begin(), |
| identifiers.end()); |
| } |
| |
| if (internal_params.identifiers().empty()) { |
| return absl::InvalidArgumentError( |
| "Subscription parameters must include at least one identifier or " |
| "specify CONFIGURATION_TYPE_ALL_RESOURCES."); |
| } |
| |
| // Sampling interval must be greater than 0. |
| if (internal_params.sampling_interval_ms() <= 0) { |
| return absl::InvalidArgumentError( |
| "Sampling interval must be greater than 0."); |
| } |
| |
| SubscriptionId sub_id = GenerateSubscriptionId(); |
| absl::AnyInvocable<void(Payload&&)> callback; |
| callback = [this, |
| cb = std::move(on_data_callback)](Payload&& payload) mutable { |
| // If an explicit callback is provided in AddSubscription, we know it's a |
| // manual subscription. So for that subscription, there will be no push done |
| // via feed client and the callback provided will be called. |
| if (cb != nullptr) { |
| cb(std::move(payload)); |
| return; |
| } |
| if (feed_client_ != nullptr) { |
| platforms_syshealth::collection::feed::WriteMetricsRequest request; |
| *request.add_metric_sets()->mutable_hft_payload()->add_payload() = |
| std::move(payload); |
| auto response = feed_client_->WriteMetrics(request); |
| if (!response.ok()) { |
| LOG(ERROR) << "Failed to write metrics: " << response.status(); |
| } |
| } |
| }; |
| auto active_sub = std::make_shared<ActiveSubscription>( |
| sub_id, internal_params, std::move(callback), |
| [this](const SubscriptionId& id, const absl::Status& status) { |
| this->OnSubscriptionEnd(id, status); |
| }, |
| task_scheduler_.get(), data_source_.get()); |
| |
| // Create resource monitors for all the identifiers in the subscription |
| // params. |
| for (const auto& identifier : internal_params.identifiers()) { |
| std::shared_ptr<ResourceMonitor> resource_monitor; |
| { |
| absl::MutexLock lock(resource_monitors_mutex_); |
| auto it = resource_monitors_.find(identifier); |
| if (it == resource_monitors_.end()) { |
| resource_monitor = |
| std::make_shared<ResourceMonitor>(identifier, data_source_.get()); |
| resource_monitors_[identifier] = resource_monitor; |
| } else { |
| resource_monitor = it->second; |
| } |
| } |
| ECCLESIA_RETURN_IF_ERROR(resource_monitor->AddSubscriber(active_sub)); |
| } |
| |
| // Add the subscription to the active subscriptions map. |
| { |
| absl::MutexLock lock(active_subscriptions_mutex_); |
| active_subscriptions_[sub_id] = active_sub; |
| } |
| active_sub->Begin(); |
| return sub_id; |
| } |
| |
| absl::Status SubscriptionManagerImpl::Unsubscribe( |
| const SubscriptionId& sub_id) { |
| { |
| absl::MutexLock lock(active_subscriptions_mutex_); |
| auto it = active_subscriptions_.find(sub_id); |
| if (it == active_subscriptions_.end()) { |
| return absl::NotFoundError( |
| absl::StrCat("Subscription ", sub_id, " not found for unsubscribe.")); |
| } |
| LOG(WARNING) << "Unsubscribing " << sub_id; |
| } |
| OnSubscriptionEnd(sub_id, absl::CancelledError("Client unsubscribed.")); |
| return absl::OkStatus(); |
| } |
| |
| SubscriptionManagerImpl::SubscriptionId |
| SubscriptionManagerImpl::GenerateSubscriptionId() { |
| return "sub_" + std::to_string(next_subscription_id_++); |
| } |
| |
| nlohmann::json SubscriptionManagerImpl::ToJson() const { |
| nlohmann::json json; |
| json["next_subscription_id"] = next_subscription_id_.load(); |
| json["task_scheduler"] = task_scheduler_->ToJson(); |
| json["data_source"] = data_source_->GetName(); |
| { |
| absl::MutexLock lock(active_subscriptions_mutex_); |
| for (const auto& [id, sub] : active_subscriptions_) { |
| json["active_subscriptions"].push_back(sub->ToJson()); |
| } |
| } |
| json["resource_monitors"] = nlohmann::json::array(); |
| { |
| absl::MutexLock lock(resource_monitors_mutex_); |
| for (const auto& [identifier, resource_monitor] : resource_monitors_) { |
| json["resource_monitors"].push_back(resource_monitor->ToJson()); |
| } |
| } |
| return json; |
| } |
| |
| } // namespace milotic_hft |