blob: f814dbf9f875660c9025f2b7afd63f50200356f2 [file] [log] [blame]
#include "tlbmc/subscription/manager_impl.h"
#include <sys/param.h>
#include <atomic>
#include <cmath>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "g3/macros.h"
#include "tlbmc/adapter/data_source.h"
#include "tlbmc/subscription/manager.h"
#include "identifier.pb.h"
#include "payload.pb.h"
#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
#include "tlbmc/scheduler/scheduler.h"
namespace milotic_hft {
namespace {
absl::StatusOr<absl::Time> GetLastSampledTime(const Payload& data) {
if (data.has_high_frequency_sensors_readings_batch()) {
// Get the last sampled time from the last sensor.
return absl::FromUnixNanos(data.high_frequency_sensors_readings_batch()
.high_frequency_sensors()
.rbegin()
->timestamped_readings()
.rbegin()
->timestamp_ns());
}
return absl::InvalidArgumentError("Unsupported payload type.");
}
absl::StatusOr<std::vector<HighFrequencySensorReading>>
ResampleHighFrequencySensorsReadings(
const HighFrequencySensorsReadings& readings, int sampling_interval_ms) {
std::vector<HighFrequencySensorReading> resampled_readings;
const auto& all_readings = readings.timestamped_readings();
if (all_readings.empty()) {
return absl::InvalidArgumentError("No readings found.");
}
// Convert sampling interval from milliseconds to nanoseconds
const int64_t sampling_interval_ns = sampling_interval_ms * 1000000;
// Store the timestamp of the first reading to normalize bin calculations
const int64_t first_reading_timestamp_ns =
all_readings.begin()->timestamp_ns();
// Map to store: bin_start_timestamp_ns -> {HighFrequencySensorReading,
// absolute_diff_to_midpoint_ns} btree_map automatically keeps keys sorted,
// ensuring output is chronological.
absl::btree_map<int64_t, std::pair<HighFrequencySensorReading, int64_t>>
best_reading_for_bin;
for (const auto& current_reading : all_readings) {
int64_t current_timestamp_ns = current_reading.timestamp_ns();
// Calculate the start timestamp of the bin this reading belongs to.
// We normalize by `first_reading_timestamp_ns` to ensure bins start
// relative to the earliest data point, preventing large initial empty bins
// if startTime != 0.
int64_t relative_time_ns =
current_timestamp_ns - first_reading_timestamp_ns;
int64_t bin_index = static_cast<int64_t>(
std::floor((relative_time_ns) / sampling_interval_ns));
int64_t bin_start_timestamp_ns =
first_reading_timestamp_ns + bin_index * sampling_interval_ns;
// Calculate the midpoint of this bin
int64_t bin_midpoint_ns = bin_start_timestamp_ns + sampling_interval_ns / 2;
// Calculate the absolute difference from the midpoint
int64_t diff_from_midpoint_ns =
std::abs(current_timestamp_ns - bin_midpoint_ns);
// Check if this bin already has a best reading, or if this reading is
// closer
auto it = best_reading_for_bin.find(bin_start_timestamp_ns);
if (it == best_reading_for_bin.end() ||
diff_from_midpoint_ns < it->second.second) {
// This is the first reading for this bin, or it's closer than the
// previous best
best_reading_for_bin[bin_start_timestamp_ns] = {current_reading,
diff_from_midpoint_ns};
}
}
// Collect the selected readings, ordered by bin start time (due to std::map's
// nature)
for (const auto& pair_entry : best_reading_for_bin) {
resampled_readings.push_back(pair_entry.second.first);
}
return resampled_readings;
}
absl::StatusOr<Payload> ResampleReadings(const Payload& raw_data,
int sampling_interval_ms) {
Payload resampled_data;
// Resample high frequency sensors readings.
if (raw_data.has_high_frequency_sensors_readings_batch()) {
for (const auto& sensor : raw_data.high_frequency_sensors_readings_batch()
.high_frequency_sensors()) {
ECCLESIA_ASSIGN_OR_RETURN(
std::vector<HighFrequencySensorReading> resampled_readings,
ResampleHighFrequencySensorsReadings(sensor, sampling_interval_ms));
HighFrequencySensorsReadings* new_sensor_with_resampled_readings =
resampled_data.mutable_high_frequency_sensors_readings_batch()
->add_high_frequency_sensors();
*new_sensor_with_resampled_readings->mutable_sensor_identifier() =
sensor.sensor_identifier();
*new_sensor_with_resampled_readings->mutable_timestamped_readings() = {
resampled_readings.begin(), resampled_readings.end()};
}
}
// Other payload types can be resampled here.
// For now, we only support high frequency sensors readings.
return resampled_data;
}
} // namespace
// ResourceMonitor implementation.
absl::Status ResourceMonitor::AddSubscriber(
const std::shared_ptr<ActiveSubscription>& subscription) {
LOG(WARNING) << "Adding subscriber " << subscription->GetId() << " to "
<< identifier_;
{
absl::MutexLock lock(&subscribers_mutex_);
subscribers_.emplace(subscription->GetId(), subscription);
lowest_sampling_interval_subscriptions_.insert(
subscription->GetParams().sampling_interval_ms());
// Calculate the batch size for the data source.
int current_batch_size = subscription->GetParams().export_interval_ms() /
subscription->GetParams().sampling_interval_ms();
max_batch_sizes_.insert(current_batch_size);
LOG(WARNING) << "Max batch size for " << identifier_ << " is "
<< current_batch_size;
}
ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource());
return absl::OkStatus();
}
absl::Status ResourceMonitor::RemoveSubscriber(const SubscriptionId& id) {
int sampling_interval_ms_to_remove;
{
absl::MutexLock lock(&subscribers_mutex_);
LOG(WARNING) << "Removing subscriber " << id << " from " << identifier_
<< " with sampling interval " << current_sampling_interval_ms_;
auto it = subscribers_.find(id);
if (it == subscribers_.end()) {
return absl::InvalidArgumentError(
absl::StrCat("Subscriber ", id, " not found."));
}
// Get the sampling interval for the subscription.
sampling_interval_ms_to_remove =
it->second->GetParams().sampling_interval_ms();
// Get the batch size for the subscription.
int current_batch_size = it->second->GetParams().export_interval_ms() /
it->second->GetParams().sampling_interval_ms();
// Remove the subscription from the set of subscribers.
subscribers_.erase(it);
// Remove the sampling interval from the set of lowest sampling intervals.
lowest_sampling_interval_subscriptions_.erase(
lowest_sampling_interval_subscriptions_.find(
sampling_interval_ms_to_remove));
// Remove the batch size from the set of max batch sizes for the data
// source.
max_batch_sizes_.erase(max_batch_sizes_.find(current_batch_size));
}
ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource());
return absl::OkStatus();
}
absl::Status ResourceMonitor::ConfigureDataSource() {
absl::MutexLock lock(&subscribers_mutex_);
int lowest_sampling_interval_ms =
lowest_sampling_interval_subscriptions_.empty()
? -1
: *lowest_sampling_interval_subscriptions_.begin();
// Update the max batch size for the data source.
int max_batch_size =
max_batch_sizes_.empty() ? -1 : *max_batch_sizes_.begin();
bool set_max_batch_size =
max_batch_size != current_max_batch_size_ && max_batch_size != -1;
bool set_sampling_interval =
lowest_sampling_interval_ms != current_sampling_interval_ms_ &&
lowest_sampling_interval_ms != -1;
bool reset_max_batch_size = max_batch_size == -1;
bool reset_sampling_interval = lowest_sampling_interval_ms == -1;
if (set_max_batch_size) {
LOG(WARNING) << "Setting max batch size for " << identifier_ << " to "
<< max_batch_size;
ECCLESIA_RETURN_IF_ERROR(
data_source_->ConfigureBatchSize(identifier_, max_batch_size));
} else if (reset_max_batch_size) {
ECCLESIA_RETURN_IF_ERROR(
data_source_->ResetBatchSizeToDefault(identifier_));
}
if (set_sampling_interval) {
LOG(WARNING) << "Setting effective sampling interval for " << identifier_
<< " to " << lowest_sampling_interval_ms;
ECCLESIA_RETURN_IF_ERROR(data_source_->ConfigureSamplingInterval(
identifier_, lowest_sampling_interval_ms));
} else if (reset_sampling_interval) {
ECCLESIA_RETURN_IF_ERROR(
data_source_->ResetSamplingIntervalToDefault(identifier_));
}
current_sampling_interval_ms_ = lowest_sampling_interval_ms;
current_max_batch_size_ = max_batch_size;
return absl::OkStatus();
}
// ActiveSubscription implementation.
void ActiveSubscription::DeliverData(Payload&& raw_data) {
if (batches_remaining_ == 0) {
return;
}
// Deliver the data to the client.
on_data_callback_(std::move(raw_data));
// If batches_remaining_ is -1, the subscription is indefinite and will be
// ended when the client unsubscribes.
if (batches_remaining_ < 0) {
return;
}
// Subscription is limited to a fixed number of batches.
// If the number of batches is reached, the subscription will be ended.
--batches_remaining_;
DLOG(INFO) << "Subscription " << id_ << " delivered batch. "
<< batches_remaining_ << " batches remaining.";
if (batches_remaining_ == 0) {
on_subscription_ended_callback_(id_, absl::OkStatus());
}
}
// ActiveSubscription implementation.
ActiveSubscription::ActiveSubscription(
const SubscriptionId& id, const milotic_hft::SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback,
absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
on_subscription_ended,
milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source)
: id_(id),
params_(params),
on_data_callback_(std::move(on_data_callback)),
on_subscription_ended_callback_(std::move(on_subscription_ended)),
batches_remaining_(params.num_batches() == 0 ? -1 : params.num_batches()),
task_scheduler_(task_scheduler),
data_source_(data_source) {
LOG(WARNING) << "ActiveSubscription " << id_
<< " created. num_batches: " << params_.num_batches();
}
// ActiveSubscription implementation.
void ActiveSubscription::Begin() {
// Schedule all the tasks for this subscription.
// The task calls the data source to collect the data and then goes over
// the data and downsample to the desired sampling interval in the
// subscription params.
auto task = [weak_self = weak_from_this()](
absl::AnyInvocable<void()> on_done) mutable {
std::shared_ptr<ActiveSubscription> self = weak_self.lock();
if (self == nullptr) {
on_done();
return;
}
// Collect the data for all the identifiers in the subscription params.
Payload consolidated_data;
for (const auto& identifier : self->params_.identifiers()) {
absl::Time last_sampled_time;
{
absl::MutexLock lock(&self->identifier_to_last_sampled_time_ms_mutex_);
last_sampled_time = self->identifier_to_last_sampled_time_[identifier];
}
absl::StatusOr<Payload> data =
self->data_source_->Collect(identifier, last_sampled_time);
if (!data.ok() || data->ByteSizeLong() == 0) {
LOG(ERROR) << "Failed to collect data for subscription " << self->id_
<< ": " << data.status();
on_done();
return;
}
// Resample the data to the desired sampling interval.
absl::StatusOr<Payload> resampled_data =
ResampleReadings(*data, self->params_.sampling_interval_ms());
if (!resampled_data.ok()) {
LOG(ERROR) << "Failed to resample data for subscription " << self->id_
<< ": " << resampled_data.status();
on_done();
return;
}
// Get last sampled time for the identifier from the resampled data.
absl::StatusOr<absl::Time> updated_sample_time =
GetLastSampledTime(*resampled_data);
if (!updated_sample_time.ok()) {
LOG(ERROR) << "Failed to get last sampled time for subscription "
<< self->id_ << ": " << updated_sample_time.status();
on_done();
return;
}
last_sampled_time = *updated_sample_time;
// Merge the data into the consolidated data as there may be multiple
// identifiers in the subscription params.
consolidated_data.MergeFrom(*resampled_data);
{
absl::MutexLock lock(&self->identifier_to_last_sampled_time_ms_mutex_);
self->identifier_to_last_sampled_time_[identifier] = last_sampled_time;
}
}
self->DeliverData(std::move(consolidated_data));
on_done();
};
task_id_ = task_scheduler_->ScheduleAsync(
task, absl::Milliseconds(params_.export_interval_ms()));
}
void SubscriptionManagerImpl::OnSubscriptionEnd(
const SubscriptionId& subscription_id, const absl::Status& status) {
std::shared_ptr<ActiveSubscription> sub_to_remove;
{
absl::MutexLock lock(&active_subscriptions_mutex_);
auto it = active_subscriptions_.find(subscription_id);
if (it == active_subscriptions_.end()) {
return;
}
sub_to_remove = it->second;
active_subscriptions_.erase(it);
}
LOG(WARNING) << "Subscription " << subscription_id << " ended with status "
<< status;
for (const auto& identifier : sub_to_remove->GetParams().identifiers()) {
std::shared_ptr<ResourceMonitor> resource_monitor;
{
absl::MutexLock lock(&resource_monitors_mutex_);
auto it = resource_monitors_.find(identifier);
if (it == resource_monitors_.end()) {
continue;
}
resource_monitor = it->second;
}
absl::Status remove_status =
resource_monitor->RemoveSubscriber(subscription_id);
if (!remove_status.ok()) {
LOG(ERROR) << "Failed to remove subscriber " << subscription_id
<< " from resource " << identifier << ": " << status;
}
}
}
absl::StatusOr<SubscriptionManager::SubscriptionId>
SubscriptionManagerImpl::AddSubscription(
const milotic_hft::SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback) {
LOG(WARNING) << "Add Subscription with params: " << params;
if (on_data_callback == nullptr) {
return absl::InvalidArgumentError("on_data_callback cannot be null.");
}
// Copy the subscription params to append the identifiers to subscribe to.
milotic_hft::SubscriptionParams internal_params = params;
if (params.subscription_policy().configuration_type() ==
milotic_hft::SubscriptionPolicy::
CONFIGURATION_TYPE_CONFIGURE_ALL_RESOURCES) {
// Get all the identifiers from the data source.
ECCLESIA_ASSIGN_OR_RETURN(
std::vector<Identifier> identifiers,
data_source_->GetIdentifiersForResourceType(
{params.subscription_policy().resource_type()}));
internal_params.mutable_identifiers()->Add(identifiers.begin(),
identifiers.end());
}
if (internal_params.identifiers().empty()) {
return absl::InvalidArgumentError(
"Subscription parameters must include at least one identifier or "
"specify CONFIGURATION_TYPE_ALL_RESOURCES.");
}
// Sampling interval must be greater than 0.
if (internal_params.sampling_interval_ms() <= 0) {
return absl::InvalidArgumentError(
"Sampling interval must be greater than 0.");
}
SubscriptionId sub_id = GenerateSubscriptionId();
auto active_sub = std::make_shared<ActiveSubscription>(
sub_id, internal_params, std::move(on_data_callback),
[this](const SubscriptionId& id, const absl::Status& status) {
this->OnSubscriptionEnd(id, status);
},
task_scheduler_.get(), data_source_.get());
// Create resource monitors for all the identifiers in the subscription
// params.
for (const auto& identifier : internal_params.identifiers()) {
std::shared_ptr<ResourceMonitor> resource_monitor;
{
absl::MutexLock lock(&resource_monitors_mutex_);
auto it = resource_monitors_.find(identifier);
if (it == resource_monitors_.end()) {
resource_monitor =
std::make_shared<ResourceMonitor>(identifier, data_source_.get());
resource_monitors_[identifier] = resource_monitor;
} else {
resource_monitor = it->second;
}
}
ECCLESIA_RETURN_IF_ERROR(resource_monitor->AddSubscriber(active_sub));
}
// Add the subscription to the active subscriptions map.
{
absl::MutexLock lock(&active_subscriptions_mutex_);
active_subscriptions_[sub_id] = active_sub;
}
active_sub->Begin();
return sub_id;
}
absl::Status SubscriptionManagerImpl::Unsubscribe(
const SubscriptionId& sub_id) {
{
absl::MutexLock lock(&active_subscriptions_mutex_);
auto it = active_subscriptions_.find(sub_id);
if (it == active_subscriptions_.end()) {
return absl::NotFoundError(
absl::StrCat("Subscription ", sub_id, " not found for unsubscribe."));
}
LOG(WARNING) << "Unsubscribing " << sub_id;
}
OnSubscriptionEnd(sub_id, absl::CancelledError("Client unsubscribed."));
return absl::OkStatus();
}
SubscriptionManagerImpl::SubscriptionId
SubscriptionManagerImpl::GenerateSubscriptionId() {
return "sub_" + std::to_string(next_subscription_id_++);
}
} // namespace milotic_hft