| #include "tlbmc/subscription/manager_impl.h" |
| |
| #include <sys/param.h> |
| |
| #include <atomic> |
| #include <cmath> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/container/btree_map.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/container/flat_hash_set.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "g3/macros.h" |
| #include "tlbmc/adapter/data_source.h" |
| #include "tlbmc/subscription/manager.h" |
| #include "identifier.pb.h" |
| #include "payload.pb.h" |
| #include "sensor_payload.pb.h" |
| #include "subscription_params.pb.h" |
| #include "tlbmc/scheduler/scheduler.h" |
| |
| namespace milotic_hft { |
| |
| namespace { |
| |
| absl::StatusOr<absl::Time> GetLastSampledTime(const Payload& data) { |
| if (data.has_high_frequency_sensors_readings_batch()) { |
| // Get the last sampled time from the last sensor. |
| return absl::FromUnixNanos(data.high_frequency_sensors_readings_batch() |
| .high_frequency_sensors() |
| .rbegin() |
| ->timestamped_readings() |
| .rbegin() |
| ->timestamp_ns()); |
| } |
| return absl::InvalidArgumentError("Unsupported payload type."); |
| } |
| |
| absl::StatusOr<std::vector<HighFrequencySensorReading>> |
| ResampleHighFrequencySensorsReadings( |
| const HighFrequencySensorsReadings& readings, int sampling_interval_ms) { |
| std::vector<HighFrequencySensorReading> resampled_readings; |
| const auto& all_readings = readings.timestamped_readings(); |
| |
| if (all_readings.empty()) { |
| return absl::InvalidArgumentError("No readings found."); |
| } |
| |
| // Convert sampling interval from milliseconds to nanoseconds |
| const int64_t sampling_interval_ns = sampling_interval_ms * 1000000; |
| |
| // Store the timestamp of the first reading to normalize bin calculations |
| const int64_t first_reading_timestamp_ns = |
| all_readings.begin()->timestamp_ns(); |
| |
| // Map to store: bin_start_timestamp_ns -> {HighFrequencySensorReading, |
| // absolute_diff_to_midpoint_ns} btree_map automatically keeps keys sorted, |
| // ensuring output is chronological. |
| absl::btree_map<int64_t, std::pair<HighFrequencySensorReading, int64_t>> |
| best_reading_for_bin; |
| |
| for (const auto& current_reading : all_readings) { |
| int64_t current_timestamp_ns = current_reading.timestamp_ns(); |
| |
| // Calculate the start timestamp of the bin this reading belongs to. |
| // We normalize by `first_reading_timestamp_ns` to ensure bins start |
| // relative to the earliest data point, preventing large initial empty bins |
| // if startTime != 0. |
| int64_t relative_time_ns = |
| current_timestamp_ns - first_reading_timestamp_ns; |
| int64_t bin_index = static_cast<int64_t>( |
| std::floor((relative_time_ns) / sampling_interval_ns)); |
| int64_t bin_start_timestamp_ns = |
| first_reading_timestamp_ns + bin_index * sampling_interval_ns; |
| |
| // Calculate the midpoint of this bin |
| int64_t bin_midpoint_ns = bin_start_timestamp_ns + sampling_interval_ns / 2; |
| |
| // Calculate the absolute difference from the midpoint |
| int64_t diff_from_midpoint_ns = |
| std::abs(current_timestamp_ns - bin_midpoint_ns); |
| |
| // Check if this bin already has a best reading, or if this reading is |
| // closer |
| auto it = best_reading_for_bin.find(bin_start_timestamp_ns); |
| if (it == best_reading_for_bin.end() || |
| diff_from_midpoint_ns < it->second.second) { |
| // This is the first reading for this bin, or it's closer than the |
| // previous best |
| best_reading_for_bin[bin_start_timestamp_ns] = {current_reading, |
| diff_from_midpoint_ns}; |
| } |
| } |
| |
| // Collect the selected readings, ordered by bin start time (due to std::map's |
| // nature) |
| for (const auto& pair_entry : best_reading_for_bin) { |
| resampled_readings.push_back(pair_entry.second.first); |
| } |
| |
| return resampled_readings; |
| } |
| |
| absl::StatusOr<Payload> ResampleReadings(const Payload& raw_data, |
| int sampling_interval_ms) { |
| Payload resampled_data; |
| |
| // Resample high frequency sensors readings. |
| if (raw_data.has_high_frequency_sensors_readings_batch()) { |
| for (const auto& sensor : raw_data.high_frequency_sensors_readings_batch() |
| .high_frequency_sensors()) { |
| ECCLESIA_ASSIGN_OR_RETURN( |
| std::vector<HighFrequencySensorReading> resampled_readings, |
| ResampleHighFrequencySensorsReadings(sensor, sampling_interval_ms)); |
| HighFrequencySensorsReadings* new_sensor_with_resampled_readings = |
| resampled_data.mutable_high_frequency_sensors_readings_batch() |
| ->add_high_frequency_sensors(); |
| *new_sensor_with_resampled_readings->mutable_sensor_identifier() = |
| sensor.sensor_identifier(); |
| *new_sensor_with_resampled_readings->mutable_timestamped_readings() = { |
| resampled_readings.begin(), resampled_readings.end()}; |
| } |
| } |
| // Other payload types can be resampled here. |
| // For now, we only support high frequency sensors readings. |
| |
| return resampled_data; |
| } |
| |
| } // namespace |
| |
| // ResourceMonitor implementation. |
| absl::Status ResourceMonitor::AddSubscriber( |
| const std::shared_ptr<ActiveSubscription>& subscription) { |
| LOG(WARNING) << "Adding subscriber " << subscription->GetId() << " to " |
| << identifier_; |
| { |
| absl::MutexLock lock(&subscribers_mutex_); |
| subscribers_.emplace(subscription->GetId(), subscription); |
| lowest_sampling_interval_subscriptions_.insert( |
| subscription->GetParams().sampling_interval_ms()); |
| // Calculate the batch size for the data source. |
| int current_batch_size = subscription->GetParams().export_interval_ms() / |
| subscription->GetParams().sampling_interval_ms(); |
| max_batch_sizes_.insert(current_batch_size); |
| LOG(WARNING) << "Max batch size for " << identifier_ << " is " |
| << current_batch_size; |
| } |
| ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource()); |
| return absl::OkStatus(); |
| } |
| |
| absl::Status ResourceMonitor::RemoveSubscriber(const SubscriptionId& id) { |
| int sampling_interval_ms_to_remove; |
| { |
| absl::MutexLock lock(&subscribers_mutex_); |
| LOG(WARNING) << "Removing subscriber " << id << " from " << identifier_ |
| << " with sampling interval " << current_sampling_interval_ms_; |
| auto it = subscribers_.find(id); |
| if (it == subscribers_.end()) { |
| return absl::InvalidArgumentError( |
| absl::StrCat("Subscriber ", id, " not found.")); |
| } |
| // Get the sampling interval for the subscription. |
| sampling_interval_ms_to_remove = |
| it->second->GetParams().sampling_interval_ms(); |
| // Get the batch size for the subscription. |
| int current_batch_size = it->second->GetParams().export_interval_ms() / |
| it->second->GetParams().sampling_interval_ms(); |
| // Remove the subscription from the set of subscribers. |
| subscribers_.erase(it); |
| // Remove the sampling interval from the set of lowest sampling intervals. |
| lowest_sampling_interval_subscriptions_.erase( |
| lowest_sampling_interval_subscriptions_.find( |
| sampling_interval_ms_to_remove)); |
| // Remove the batch size from the set of max batch sizes for the data |
| // source. |
| max_batch_sizes_.erase(max_batch_sizes_.find(current_batch_size)); |
| } |
| ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource()); |
| return absl::OkStatus(); |
| } |
| |
| absl::Status ResourceMonitor::ConfigureDataSource() { |
| absl::MutexLock lock(&subscribers_mutex_); |
| int lowest_sampling_interval_ms = |
| lowest_sampling_interval_subscriptions_.empty() |
| ? -1 |
| : *lowest_sampling_interval_subscriptions_.begin(); |
| // Update the max batch size for the data source. |
| int max_batch_size = |
| max_batch_sizes_.empty() ? -1 : *max_batch_sizes_.begin(); |
| |
| bool set_max_batch_size = |
| max_batch_size != current_max_batch_size_ && max_batch_size != -1; |
| bool set_sampling_interval = |
| lowest_sampling_interval_ms != current_sampling_interval_ms_ && |
| lowest_sampling_interval_ms != -1; |
| bool reset_max_batch_size = max_batch_size == -1; |
| bool reset_sampling_interval = lowest_sampling_interval_ms == -1; |
| |
| if (set_max_batch_size) { |
| LOG(WARNING) << "Setting max batch size for " << identifier_ << " to " |
| << max_batch_size; |
| ECCLESIA_RETURN_IF_ERROR( |
| data_source_->ConfigureBatchSize(identifier_, max_batch_size)); |
| } else if (reset_max_batch_size) { |
| ECCLESIA_RETURN_IF_ERROR( |
| data_source_->ResetBatchSizeToDefault(identifier_)); |
| } |
| |
| if (set_sampling_interval) { |
| LOG(WARNING) << "Setting effective sampling interval for " << identifier_ |
| << " to " << lowest_sampling_interval_ms; |
| ECCLESIA_RETURN_IF_ERROR(data_source_->ConfigureSamplingInterval( |
| identifier_, lowest_sampling_interval_ms)); |
| } else if (reset_sampling_interval) { |
| ECCLESIA_RETURN_IF_ERROR( |
| data_source_->ResetSamplingIntervalToDefault(identifier_)); |
| } |
| current_sampling_interval_ms_ = lowest_sampling_interval_ms; |
| current_max_batch_size_ = max_batch_size; |
| return absl::OkStatus(); |
| } |
| |
| // ActiveSubscription implementation. |
| void ActiveSubscription::DeliverData(Payload&& raw_data) { |
| if (batches_remaining_ == 0) { |
| return; |
| } |
| |
| // Deliver the data to the client. |
| on_data_callback_(std::move(raw_data)); |
| |
| // If batches_remaining_ is -1, the subscription is indefinite and will be |
| // ended when the client unsubscribes. |
| if (batches_remaining_ < 0) { |
| return; |
| } |
| |
| // Subscription is limited to a fixed number of batches. |
| // If the number of batches is reached, the subscription will be ended. |
| --batches_remaining_; |
| DLOG(INFO) << "Subscription " << id_ << " delivered batch. " |
| << batches_remaining_ << " batches remaining."; |
| if (batches_remaining_ == 0) { |
| on_subscription_ended_callback_(id_, absl::OkStatus()); |
| } |
| } |
| |
| // ActiveSubscription implementation. |
| ActiveSubscription::ActiveSubscription( |
| const SubscriptionId& id, const milotic_hft::SubscriptionParams& params, |
| absl::AnyInvocable<void(Payload&&)> on_data_callback, |
| absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)> |
| on_subscription_ended, |
| milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source) |
| : id_(id), |
| params_(params), |
| on_data_callback_(std::move(on_data_callback)), |
| on_subscription_ended_callback_(std::move(on_subscription_ended)), |
| batches_remaining_(params.num_batches() == 0 ? -1 : params.num_batches()), |
| task_scheduler_(task_scheduler), |
| data_source_(data_source) { |
| LOG(WARNING) << "ActiveSubscription " << id_ |
| << " created. num_batches: " << params_.num_batches(); |
| } |
| |
| // ActiveSubscription implementation. |
| void ActiveSubscription::Begin() { |
| // Schedule all the tasks for this subscription. |
| // The task calls the data source to collect the data and then goes over |
| // the data and downsample to the desired sampling interval in the |
| // subscription params. |
| |
| auto task = [weak_self = weak_from_this()]( |
| absl::AnyInvocable<void()> on_done) mutable { |
| std::shared_ptr<ActiveSubscription> self = weak_self.lock(); |
| if (self == nullptr) { |
| on_done(); |
| return; |
| } |
| |
| // Collect the data for all the identifiers in the subscription params. |
| Payload consolidated_data; |
| for (const auto& identifier : self->params_.identifiers()) { |
| absl::Time last_sampled_time; |
| { |
| absl::MutexLock lock(&self->identifier_to_last_sampled_time_ms_mutex_); |
| last_sampled_time = self->identifier_to_last_sampled_time_[identifier]; |
| } |
| absl::StatusOr<Payload> data = |
| self->data_source_->Collect(identifier, last_sampled_time); |
| if (!data.ok() || data->ByteSizeLong() == 0) { |
| LOG(ERROR) << "Failed to collect data for subscription " << self->id_ |
| << ": " << data.status(); |
| on_done(); |
| return; |
| } |
| |
| // Resample the data to the desired sampling interval. |
| absl::StatusOr<Payload> resampled_data = |
| ResampleReadings(*data, self->params_.sampling_interval_ms()); |
| if (!resampled_data.ok()) { |
| LOG(ERROR) << "Failed to resample data for subscription " << self->id_ |
| << ": " << resampled_data.status(); |
| on_done(); |
| return; |
| } |
| |
| // Get last sampled time for the identifier from the resampled data. |
| absl::StatusOr<absl::Time> updated_sample_time = |
| GetLastSampledTime(*resampled_data); |
| if (!updated_sample_time.ok()) { |
| LOG(ERROR) << "Failed to get last sampled time for subscription " |
| << self->id_ << ": " << updated_sample_time.status(); |
| on_done(); |
| return; |
| } |
| last_sampled_time = *updated_sample_time; |
| |
| // Merge the data into the consolidated data as there may be multiple |
| // identifiers in the subscription params. |
| consolidated_data.MergeFrom(*resampled_data); |
| { |
| absl::MutexLock lock(&self->identifier_to_last_sampled_time_ms_mutex_); |
| self->identifier_to_last_sampled_time_[identifier] = last_sampled_time; |
| } |
| } |
| self->DeliverData(std::move(consolidated_data)); |
| on_done(); |
| }; |
| |
| task_id_ = task_scheduler_->ScheduleAsync( |
| task, absl::Milliseconds(params_.export_interval_ms())); |
| } |
| |
| void SubscriptionManagerImpl::OnSubscriptionEnd( |
| const SubscriptionId& subscription_id, const absl::Status& status) { |
| std::shared_ptr<ActiveSubscription> sub_to_remove; |
| { |
| absl::MutexLock lock(&active_subscriptions_mutex_); |
| auto it = active_subscriptions_.find(subscription_id); |
| if (it == active_subscriptions_.end()) { |
| return; |
| } |
| sub_to_remove = it->second; |
| active_subscriptions_.erase(it); |
| } |
| LOG(WARNING) << "Subscription " << subscription_id << " ended with status " |
| << status; |
| for (const auto& identifier : sub_to_remove->GetParams().identifiers()) { |
| std::shared_ptr<ResourceMonitor> resource_monitor; |
| { |
| absl::MutexLock lock(&resource_monitors_mutex_); |
| auto it = resource_monitors_.find(identifier); |
| if (it == resource_monitors_.end()) { |
| continue; |
| } |
| resource_monitor = it->second; |
| } |
| absl::Status remove_status = |
| resource_monitor->RemoveSubscriber(subscription_id); |
| if (!remove_status.ok()) { |
| LOG(ERROR) << "Failed to remove subscriber " << subscription_id |
| << " from resource " << identifier << ": " << status; |
| } |
| } |
| } |
| |
| absl::StatusOr<SubscriptionManager::SubscriptionId> |
| SubscriptionManagerImpl::AddSubscription( |
| const milotic_hft::SubscriptionParams& params, |
| absl::AnyInvocable<void(Payload&&)> on_data_callback) { |
| LOG(WARNING) << "Add Subscription with params: " << params; |
| if (on_data_callback == nullptr) { |
| return absl::InvalidArgumentError("on_data_callback cannot be null."); |
| } |
| |
| // Copy the subscription params to append the identifiers to subscribe to. |
| milotic_hft::SubscriptionParams internal_params = params; |
| if (params.subscription_policy().configuration_type() == |
| milotic_hft::SubscriptionPolicy:: |
| CONFIGURATION_TYPE_CONFIGURE_ALL_RESOURCES) { |
| // Get all the identifiers from the data source. |
| ECCLESIA_ASSIGN_OR_RETURN( |
| std::vector<Identifier> identifiers, |
| data_source_->GetIdentifiersForResourceType( |
| {params.subscription_policy().resource_type()})); |
| internal_params.mutable_identifiers()->Add(identifiers.begin(), |
| identifiers.end()); |
| } |
| |
| if (internal_params.identifiers().empty()) { |
| return absl::InvalidArgumentError( |
| "Subscription parameters must include at least one identifier or " |
| "specify CONFIGURATION_TYPE_ALL_RESOURCES."); |
| } |
| |
| // Sampling interval must be greater than 0. |
| if (internal_params.sampling_interval_ms() <= 0) { |
| return absl::InvalidArgumentError( |
| "Sampling interval must be greater than 0."); |
| } |
| |
| SubscriptionId sub_id = GenerateSubscriptionId(); |
| auto active_sub = std::make_shared<ActiveSubscription>( |
| sub_id, internal_params, std::move(on_data_callback), |
| [this](const SubscriptionId& id, const absl::Status& status) { |
| this->OnSubscriptionEnd(id, status); |
| }, |
| task_scheduler_.get(), data_source_.get()); |
| |
| // Create resource monitors for all the identifiers in the subscription |
| // params. |
| for (const auto& identifier : internal_params.identifiers()) { |
| std::shared_ptr<ResourceMonitor> resource_monitor; |
| { |
| absl::MutexLock lock(&resource_monitors_mutex_); |
| auto it = resource_monitors_.find(identifier); |
| if (it == resource_monitors_.end()) { |
| resource_monitor = |
| std::make_shared<ResourceMonitor>(identifier, data_source_.get()); |
| resource_monitors_[identifier] = resource_monitor; |
| } else { |
| resource_monitor = it->second; |
| } |
| } |
| ECCLESIA_RETURN_IF_ERROR(resource_monitor->AddSubscriber(active_sub)); |
| } |
| |
| // Add the subscription to the active subscriptions map. |
| { |
| absl::MutexLock lock(&active_subscriptions_mutex_); |
| active_subscriptions_[sub_id] = active_sub; |
| } |
| active_sub->Begin(); |
| return sub_id; |
| } |
| |
| absl::Status SubscriptionManagerImpl::Unsubscribe( |
| const SubscriptionId& sub_id) { |
| { |
| absl::MutexLock lock(&active_subscriptions_mutex_); |
| auto it = active_subscriptions_.find(sub_id); |
| if (it == active_subscriptions_.end()) { |
| return absl::NotFoundError( |
| absl::StrCat("Subscription ", sub_id, " not found for unsubscribe.")); |
| } |
| LOG(WARNING) << "Unsubscribing " << sub_id; |
| } |
| OnSubscriptionEnd(sub_id, absl::CancelledError("Client unsubscribed.")); |
| return absl::OkStatus(); |
| } |
| |
| SubscriptionManagerImpl::SubscriptionId |
| SubscriptionManagerImpl::GenerateSubscriptionId() { |
| return "sub_" + std::to_string(next_subscription_id_++); |
| } |
| |
| } // namespace milotic_hft |