#include "tlbmc/subscription/manager_impl.h"

#include <sys/param.h>

#include <atomic>
#include <cmath>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "tlbmc/feed_payload.pb.h"
#include "tlbmc/feed_client_interface.h"
#include "tlbmc/hft_subscription.pb.h"
#include "tlbmc/client_subscription.pb.h"
#include "tlbmc/service_messages.pb.h"
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "g3/macros.h"
#include "nlohmann/json_fwd.hpp"
#include "tlbmc/adapter/data_source.h"
#include "tlbmc/subscription/manager.h"
#include "identifier.pb.h"
#include "payload.pb.h"
#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
#include "resource.pb.h"
#include "tlbmc/scheduler/scheduler.h"

namespace milotic_hft {

namespace {

using ::platforms_syshealth::collection::feed::FeedClientInterface;
using ::platforms_syshealth::collection::feed::GetPoliciesRequest;
using ::platforms_syshealth::collection::feed::GetPoliciesResponse;

absl::StatusOr<absl::Time> GetLastSampledTime(const Payload& data) {
  if (data.has_high_frequency_sensors_readings_batch()) {
    const auto& sensors =
        data.high_frequency_sensors_readings_batch().high_frequency_sensors();
    if (sensors.empty()) {
      return absl::NotFoundError("No sensors found in the batch.");
    }
    const auto& readings = sensors.rbegin()->timestamped_readings();
    if (readings.empty()) {
      return absl::NotFoundError("No readings found.");
    }

    // Get the last sampled time from the last sensor.
    return absl::FromUnixNanos(readings.rbegin()->timestamp_ns());
  }
  return absl::InvalidArgumentError("Unsupported payload type.");
}

absl::StatusOr<std::vector<HighFrequencySensorReading>>
ResampleHighFrequencySensorsReadings(
    const HighFrequencySensorsReadings& readings, int sampling_interval_ms) {
  std::vector<HighFrequencySensorReading> resampled_readings;
  const auto& all_readings = readings.timestamped_readings();

  if (all_readings.empty()) {
    return absl::InvalidArgumentError("No readings found.");
  }

  // Convert sampling interval from milliseconds to nanoseconds
  const int64_t sampling_interval_ns = sampling_interval_ms * 1000000;

  // Store the timestamp of the first reading to normalize bin calculations
  const int64_t first_reading_timestamp_ns =
      all_readings.begin()->timestamp_ns();

  // Map to store: bin_start_timestamp_ns -> {HighFrequencySensorReading,
  // absolute_diff_to_midpoint_ns} btree_map automatically keeps keys sorted,
  // ensuring output is chronological.
  absl::btree_map<int64_t, std::pair<HighFrequencySensorReading, int64_t>>
      best_reading_for_bin;

  for (const auto& current_reading : all_readings) {
    int64_t current_timestamp_ns = current_reading.timestamp_ns();

    // Calculate the start timestamp of the bin this reading belongs to.
    // We normalize by `first_reading_timestamp_ns` to ensure bins start
    // relative to the earliest data point, preventing large initial empty bins
    // if startTime != 0.
    int64_t relative_time_ns =
        current_timestamp_ns - first_reading_timestamp_ns;
    int64_t bin_index = static_cast<int64_t>(
        std::floor((relative_time_ns) / sampling_interval_ns));
    int64_t bin_start_timestamp_ns =
        first_reading_timestamp_ns + bin_index * sampling_interval_ns;

    // Calculate the midpoint of this bin
    int64_t bin_midpoint_ns = bin_start_timestamp_ns + sampling_interval_ns / 2;

    // Calculate the absolute difference from the midpoint
    int64_t diff_from_midpoint_ns =
        std::abs(current_timestamp_ns - bin_midpoint_ns);

    // Check if this bin already has a best reading, or if this reading is
    // closer
    auto it = best_reading_for_bin.find(bin_start_timestamp_ns);
    if (it == best_reading_for_bin.end() ||
        diff_from_midpoint_ns < it->second.second) {
      // This is the first reading for this bin, or it's closer than the
      // previous best
      best_reading_for_bin[bin_start_timestamp_ns] = {current_reading,
                                                      diff_from_midpoint_ns};
    }
  }

  // Collect the selected readings, ordered by bin start time (due to std::map's
  // nature)
  for (const auto& pair_entry : best_reading_for_bin) {
    resampled_readings.push_back(pair_entry.second.first);
  }

  return resampled_readings;
}

absl::StatusOr<Payload> ResampleReadings(const Payload& raw_data,
                                         int sampling_interval_ms) {
  Payload resampled_data;

  // Resample high frequency sensors readings.
  if (raw_data.has_high_frequency_sensors_readings_batch()) {
    for (const auto& sensor : raw_data.high_frequency_sensors_readings_batch()
                                  .high_frequency_sensors()) {
      ECCLESIA_ASSIGN_OR_RETURN(
          std::vector<HighFrequencySensorReading> resampled_readings,
          ResampleHighFrequencySensorsReadings(sensor, sampling_interval_ms));
      HighFrequencySensorsReadings* new_sensor_with_resampled_readings =
          resampled_data.mutable_high_frequency_sensors_readings_batch()
              ->add_high_frequency_sensors();
      *new_sensor_with_resampled_readings->mutable_sensor_identifier() =
          sensor.sensor_identifier();
      *new_sensor_with_resampled_readings->mutable_timestamped_readings() = {
          resampled_readings.begin(), resampled_readings.end()};
    }
  }
  // Other payload types can be resampled here.
  // For now, we only support high frequency sensors readings.

  return resampled_data;
}

}  // namespace

// ResourceMonitor implementation.
absl::Status ResourceMonitor::AddSubscriber(
    const std::shared_ptr<ActiveSubscription>& subscription) {
  LOG(WARNING) << "Adding subscriber " << subscription->GetId() << " to "
               << identifier_;
  {
    absl::MutexLock lock(subscribers_mutex_);
    subscribers_.emplace(subscription->GetId(), subscription);
    lowest_sampling_interval_subscriptions_.insert(
        subscription->GetParams().sampling_interval_ms());
    // Calculate the batch size for the data source.
    int current_batch_size = subscription->GetParams().export_interval_ms() /
                             subscription->GetParams().sampling_interval_ms();
    max_batch_sizes_.insert(current_batch_size);
    LOG(WARNING) << "Max batch size for " << identifier_ << " is "
                 << current_batch_size;
  }
  ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource());
  return absl::OkStatus();
}

absl::Status ResourceMonitor::RemoveSubscriber(const SubscriptionId& id) {
  int sampling_interval_ms_to_remove;
  {
    absl::MutexLock lock(subscribers_mutex_);
    LOG(WARNING) << "Removing subscriber " << id << " from " << identifier_
                 << " with sampling interval " << current_sampling_interval_ms_;
    auto it = subscribers_.find(id);
    if (it == subscribers_.end()) {
      return absl::InvalidArgumentError(
          absl::StrCat("Subscriber ", id, " not found."));
    }
    // Get the sampling interval for the subscription.
    sampling_interval_ms_to_remove =
        it->second->GetParams().sampling_interval_ms();
    // Get the batch size for the subscription.
    int current_batch_size = it->second->GetParams().export_interval_ms() /
                             it->second->GetParams().sampling_interval_ms();
    // Remove the subscription from the set of subscribers.
    subscribers_.erase(it);
    // Remove the sampling interval from the set of lowest sampling intervals.
    lowest_sampling_interval_subscriptions_.erase(
        lowest_sampling_interval_subscriptions_.find(
            sampling_interval_ms_to_remove));
    // Remove the batch size from the set of max batch sizes for the data
    // source.
    max_batch_sizes_.erase(max_batch_sizes_.find(current_batch_size));
  }
  ECCLESIA_RETURN_IF_ERROR(ConfigureDataSource());
  return absl::OkStatus();
}

nlohmann::json ResourceMonitor::ToJson() const {
  nlohmann::json json;
  json["identifier"] = absl::StrCat(identifier_);
  json["current_sampling_interval_ms"] = current_sampling_interval_ms_.load();
  json["current_max_batch_size"] = current_max_batch_size_.load();
  {
    absl::MutexLock lock(subscribers_mutex_);
    json["num_subscribers"] = subscribers_.size();
    nlohmann::json subscribers = nlohmann::json::array();
    for (const auto& [id, sub] : subscribers_) {
      subscribers.push_back(id);
    }
    json["subscribers"] = subscribers;
    json["lowest_sampling_interval_subscriptions"] = nlohmann::json::array();
    for (const auto& sampling_interval_ms :
         lowest_sampling_interval_subscriptions_) {
      json["lowest_sampling_interval_subscriptions"].push_back(
          sampling_interval_ms);
    }
    json["max_batch_sizes"] = nlohmann::json::array();
    for (const auto& max_batch_size : max_batch_sizes_) {
      json["max_batch_sizes"].push_back(max_batch_size);
    }
  }
  return json;
}

absl::Status ResourceMonitor::ConfigureDataSource() {
  absl::MutexLock lock(subscribers_mutex_);
  int lowest_sampling_interval_ms =
      lowest_sampling_interval_subscriptions_.empty()
          ? -1
          : *lowest_sampling_interval_subscriptions_.begin();
  // Update the max batch size for the data source.
  int max_batch_size =
      max_batch_sizes_.empty() ? -1 : *max_batch_sizes_.begin();

  bool set_max_batch_size =
      max_batch_size != current_max_batch_size_ && max_batch_size != -1;
  bool set_sampling_interval =
      lowest_sampling_interval_ms != current_sampling_interval_ms_ &&
      lowest_sampling_interval_ms != -1;
  bool reset_max_batch_size = max_batch_size == -1;
  bool reset_sampling_interval = lowest_sampling_interval_ms == -1;

  if (set_max_batch_size) {
    LOG(WARNING) << "Setting max batch size for " << identifier_ << " to "
                 << max_batch_size;
    ECCLESIA_RETURN_IF_ERROR(
        data_source_->ConfigureBatchSize(identifier_, max_batch_size));
  } else if (reset_max_batch_size) {
    ECCLESIA_RETURN_IF_ERROR(
        data_source_->ResetBatchSizeToDefault(identifier_));
  }

  if (set_sampling_interval) {
    LOG(WARNING) << "Setting effective sampling interval for " << identifier_
                 << " to " << lowest_sampling_interval_ms;
    ECCLESIA_RETURN_IF_ERROR(data_source_->ConfigureSamplingInterval(
        identifier_, lowest_sampling_interval_ms));
  } else if (reset_sampling_interval) {
    ECCLESIA_RETURN_IF_ERROR(
        data_source_->ResetSamplingIntervalToDefault(identifier_));
  }
  current_sampling_interval_ms_ = lowest_sampling_interval_ms;
  current_max_batch_size_ = max_batch_size;
  return absl::OkStatus();
}

SubscriptionManagerImpl::SubscriptionManagerImpl(
    std::unique_ptr<DataSource> data_source)
    : next_subscription_id_(0),
      task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()),
      data_source_(std::move(data_source)) {}

SubscriptionManagerImpl::SubscriptionManagerImpl(
    std::unique_ptr<DataSource> data_source, FeedClientInterface* feed_client,
    absl::Duration get_policies_interval)
    : next_subscription_id_(0),
      task_scheduler_(std::make_unique<milotic_tlbmc::TaskScheduler>()),
      data_source_(std::move(data_source)),
      feed_client_(feed_client),
      get_policies_interval_(get_policies_interval) {
  if (feed_client_ != nullptr) {
    get_policies_task_id_ = task_scheduler_->ScheduleAsync(
        [this](absl::AnyInvocable<void()> on_done) {
          PullAgentPoliciesAndUpdateSubscriptions();
          on_done();
        },
        get_policies_interval_);
  }
}

SubscriptionManagerImpl::~SubscriptionManagerImpl() {
  if (get_policies_task_id_ != -1) {
    task_scheduler_->Cancel(get_policies_task_id_);
  }
  task_scheduler_->Stop();
}

void SubscriptionManagerImpl::PullAgentPoliciesAndUpdateSubscriptions() {
  if (feed_client_ == nullptr) {
    return;
  }
  LOG(INFO) << "Getting policies from agent config service";
  GetPoliciesRequest request;
  absl::StatusOr<GetPoliciesResponse> response =
      feed_client_->GetPolicies(request);
  if (!response.ok()) {
    LOG(ERROR) << "Failed to get policies: " << response.status();
    return;
  }
  LOG(INFO) << "Received " << response->subscription_policies_size()
            << " policies";

  absl::flat_hash_map<
      std::string,
      const platforms_syshealth::collection::feed::ClientSubscriptionPolicy*>
      response_policies;
  for (const auto& policy : response->subscription_policies()) {
    if (policy.has_hft_subscription()) {
      response_policies[policy.policy_id()] = &policy;
    }
  }

  std::vector<platforms_syshealth::collection::feed::ClientSubscriptionPolicy>
      policies_to_add;
  std::vector<std::pair<std::string, SubscriptionId>> sub_ids_to_remove;

  {
    absl::MutexLock lock(&active_subscriptions_mutex_);
    for (const auto& [policy_id, policy] : response_policies) {
      if (!policy_to_subscription_ids_.contains(policy_id)) {
        policies_to_add.push_back(*policy);
      }
    }

    for (const auto& [policy_id, sub_id] : policy_to_subscription_ids_) {
      if (!response_policies.contains(policy_id)) {
        sub_ids_to_remove.push_back({policy_id, sub_id});
      }
    }
  }

  for (const auto& policy : policies_to_add) {
    LOG(INFO) << "Adding subscription for policy " << policy.policy_id();
    absl::StatusOr<SubscriptionId> sub_id = AddSubscription(
        policy.hft_subscription().subscription_params(), nullptr);
    if (sub_id.ok()) {
      absl::MutexLock lock(&active_subscriptions_mutex_);
      // If subscription already ended, it might have been removed from
      // active_subscriptions_, in which case we don't add it to
      // policy_to_subscription_ids_.
      if (active_subscriptions_.contains(*sub_id)) {
        policy_to_subscription_ids_[policy.policy_id()] = *sub_id;
        subscription_to_policy_ids_[*sub_id] = policy.policy_id();
      }
    } else {
      LOG(ERROR) << "Failed to add subscription for policy "
                 << policy.policy_id() << ": " << sub_id.status();
    }
  }

  for (const auto& [policy_id, sub_id] : sub_ids_to_remove) {
    LOG(INFO) << "Removing subscription for policy " << policy_id;
    absl::Status status = Unsubscribe(sub_id);
    if (!status.ok() && !absl::IsNotFound(status)) {
      LOG(ERROR) << "Failed to unsubscribe policy " << policy_id
                 << " with subscription id " << sub_id << ": " << status;
    }
  }
}

// ActiveSubscription implementation.
void ActiveSubscription::DeliverData(Payload&& raw_data) {
  if (batches_remaining_ == 0) {
    return;
  }

  // Deliver the data to the client.
  on_data_callback_(std::move(raw_data));

  // If batches_remaining_ is -1, the subscription is indefinite and will be
  // ended when the client unsubscribes.
  if (batches_remaining_ < 0) {
    return;
  }

  // Subscription is limited to a fixed number of batches.
  // If the number of batches is reached, the subscription will be ended.
  --batches_remaining_;
  DLOG(INFO) << "Subscription " << id_ << " delivered batch. "
             << batches_remaining_ << " batches remaining.";
  if (batches_remaining_ == 0) {
    on_subscription_ended_callback_(id_, absl::OkStatus());
  }
}

// ActiveSubscription implementation.
ActiveSubscription::ActiveSubscription(
    const SubscriptionId& id, const milotic_hft::SubscriptionParams& params,
    absl::AnyInvocable<void(Payload&&)> on_data_callback,
    absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
        on_subscription_ended,
    milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source)
    : id_(id),
      params_(params),
      on_data_callback_(std::move(on_data_callback)),
      on_subscription_ended_callback_(std::move(on_subscription_ended)),
      batches_remaining_(params.num_batches() == 0 ? -1 : params.num_batches()),
      task_scheduler_(task_scheduler),
      data_source_(data_source) {
  LOG(WARNING) << "ActiveSubscription " << id_
               << " created. num_batches: " << params_.num_batches();
}

// ActiveSubscription implementation.
void ActiveSubscription::Begin() {
  // Schedule all the tasks for this subscription.
  // The task calls the data source to collect the data and then goes over
  // the data and downsample to the desired sampling interval in the
  // subscription params.

  auto task = [weak_self = weak_from_this()](
                  absl::AnyInvocable<void()> on_done) mutable {
    std::shared_ptr<ActiveSubscription> self = weak_self.lock();
    if (self == nullptr) {
      on_done();
      return;
    }

    // Collect the data for all the identifiers in the subscription params.
    Payload consolidated_data;
    for (const auto& identifier : self->params_.identifiers()) {
      absl::Time last_sampled_time;
      {
        absl::MutexLock lock(self->identifier_to_last_sampled_time_ms_mutex_);
        last_sampled_time = self->identifier_to_last_sampled_time_[identifier];
      }
      absl::StatusOr<Payload> data =
          self->data_source_->Collect(identifier, last_sampled_time);
      if (!data.ok() || data->ByteSizeLong() == 0) {
        LOG(ERROR) << "Failed to collect data for subscription " << self->id_
                   << ": " << data.status();
        // In this case, there is no valid payload created. Create a placeholder
        // payload with STATUS_STALE and status message to be able to deliver.
        Payload empty_payload;
        HighFrequencySensorsReadings* sensor_readings =
            empty_payload.mutable_high_frequency_sensors_readings_batch()
                ->mutable_high_frequency_sensors()
                ->Add();

        *sensor_readings->mutable_sensor_identifier() =
            identifier.sensor_identifier();
        milotic_hft::State state;
        state.set_status(milotic_hft::STATUS_MISSING);
        state.set_status_message(
            absl::StrCat("Failed to collect data for identifier ", identifier,
                         " with status: ", data.status()));
        *sensor_readings->mutable_state() = state;
        consolidated_data.MergeFrom(empty_payload);
        continue;
      }

      // Resample the data to the desired sampling interval.
      absl::StatusOr<Payload> resampled_data = *data;
      if (!data->has_high_frequency_sensors_readings_batch() ||
          data->high_frequency_sensors_readings_batch()
                  .configured_sampling_interval_ms() !=
              self->params_.sampling_interval_ms()) {
        resampled_data =
            ResampleReadings(*data, self->params_.sampling_interval_ms());
        if (!resampled_data.ok()) {
          LOG(ERROR) << "Failed to resample data for subscription " << self->id_
                     << ": " << resampled_data.status();
          consolidated_data.MergeFrom(*data);
          continue;
        }
      }

      // Get last sampled time for the identifier from the resampled data.
      absl::StatusOr<absl::Time> updated_sample_time =
          GetLastSampledTime(*resampled_data);
      if (!updated_sample_time.ok()) {
        LOG(ERROR) << "Failed to get last sampled time for subscription "
                   << self->id_ << ": " << updated_sample_time.status();
        consolidated_data.MergeFrom(*resampled_data);
        continue;
      }
      last_sampled_time = *updated_sample_time;

      // Merge the data into the consolidated data as there may be multiple
      // identifiers in the subscription params.
      consolidated_data.MergeFrom(*resampled_data);
      {
        absl::MutexLock lock(self->identifier_to_last_sampled_time_ms_mutex_);
        self->identifier_to_last_sampled_time_[identifier] = last_sampled_time;
      }
    }
    self->DeliverData(std::move(consolidated_data));
    on_done();
  };

  task_id_ = task_scheduler_->ScheduleAsync(
      task, absl::Milliseconds(params_.export_interval_ms()));
}

nlohmann::json ActiveSubscription::ToJson() const {
  nlohmann::json json;
  json["id"] = id_;
  json["params"] = absl::StrCat(params_);
  json["batches_remaining"] = batches_remaining_.load();
  json["data_source"] = data_source_->GetName();
  absl::MutexLock lock(identifier_to_last_sampled_time_ms_mutex_);
  for (const auto& [identifier, last_sampled_time] :
       identifier_to_last_sampled_time_) {
    nlohmann::json entry;
    entry["identifier"] = absl::StrCat(identifier);
    entry["last_sampled_time"] = absl::FormatTime(last_sampled_time);
    json["identifier_to_last_sampled_time"].push_back(entry);
  }
  return json;
}

void SubscriptionManagerImpl::OnSubscriptionEnd(
    const SubscriptionId& subscription_id, const absl::Status& status) {
  std::shared_ptr<ActiveSubscription> sub_to_remove;
  {
    absl::MutexLock lock(active_subscriptions_mutex_);
    auto it = active_subscriptions_.find(subscription_id);
    if (it == active_subscriptions_.end()) {
      return;
    }
    sub_to_remove = it->second;
    active_subscriptions_.erase(it);

    // If subscription is from policy, remove from policy_to_subscription_ids_
    // and subscription_to_policy_ids_.
    auto policy_it = subscription_to_policy_ids_.find(subscription_id);
    if (policy_it != subscription_to_policy_ids_.end()) {
      policy_to_subscription_ids_.erase(policy_it->second);
      subscription_to_policy_ids_.erase(policy_it);
    }
  }
  LOG(WARNING) << "Subscription " << subscription_id << " ended with status "
               << status;
  for (const auto& identifier : sub_to_remove->GetParams().identifiers()) {
    std::shared_ptr<ResourceMonitor> resource_monitor;
    {
      absl::MutexLock lock(resource_monitors_mutex_);
      auto it = resource_monitors_.find(identifier);
      if (it == resource_monitors_.end()) {
        continue;
      }
      resource_monitor = it->second;
    }
    absl::Status remove_status =
        resource_monitor->RemoveSubscriber(subscription_id);
    if (!remove_status.ok()) {
      LOG(ERROR) << "Failed to remove subscriber " << subscription_id
                 << " from resource " << identifier << ": " << status;
    }
  }
}

absl::StatusOr<SubscriptionManager::SubscriptionId>
SubscriptionManagerImpl::AddSubscription(
    const milotic_hft::SubscriptionParams& params,
    absl::AnyInvocable<void(Payload&&)> on_data_callback) {
  LOG(WARNING) << "Add Subscription with params: " << params;
  if (feed_client_ == nullptr && on_data_callback == nullptr) {
    return absl::InvalidArgumentError("on_data_callback cannot be null.");
  }

  // Copy the subscription params to append the identifiers to subscribe to.
  milotic_hft::SubscriptionParams internal_params = params;
  if (params.subscription_policy().configuration_type() ==
      milotic_hft::SubscriptionPolicy::
          CONFIGURATION_TYPE_CONFIGURE_ALL_RESOURCES) {
    // Get all the identifiers from the data source.
    ECCLESIA_ASSIGN_OR_RETURN(
        std::vector<Identifier> identifiers,
        data_source_->GetIdentifiersForResourceType(
            {params.subscription_policy().resource_type()}));
    internal_params.mutable_identifiers()->Add(identifiers.begin(),
                                               identifiers.end());
  }

  if (internal_params.identifiers().empty()) {
    return absl::InvalidArgumentError(
        "Subscription parameters must include at least one identifier or "
        "specify CONFIGURATION_TYPE_ALL_RESOURCES.");
  }

  // Sampling interval must be greater than 0.
  if (internal_params.sampling_interval_ms() <= 0) {
    return absl::InvalidArgumentError(
        "Sampling interval must be greater than 0.");
  }

  SubscriptionId sub_id = GenerateSubscriptionId();
  absl::AnyInvocable<void(Payload&&)> callback;
  callback = [this,
              cb = std::move(on_data_callback)](Payload&& payload) mutable {
    if (feed_client_ != nullptr) {
      platforms_syshealth::collection::feed::WriteMetricsRequest request;
      // If on_data_callback is also provided, we need to copy the payload
      // for WriteMetrics, because the callback will move it. Otherwise, we
      // can move the payload into WriteMetrics.
      if (cb != nullptr) {
        *request.add_metric_sets()->mutable_hft_payload()->add_payload() =
            payload;
      } else {
        *request.add_metric_sets()->mutable_hft_payload()->add_payload() =
            std::move(payload);
      }
      auto response = feed_client_->WriteMetrics(request);
      if (!response.ok()) {
        LOG(ERROR) << "Failed to write metrics: " << response.status();
      }
    }
    if (cb != nullptr) {
      cb(std::move(payload));
    }
  };
  auto active_sub = std::make_shared<ActiveSubscription>(
      sub_id, internal_params, std::move(callback),
      [this](const SubscriptionId& id, const absl::Status& status) {
        this->OnSubscriptionEnd(id, status);
      },
      task_scheduler_.get(), data_source_.get());

  // Create resource monitors for all the identifiers in the subscription
  // params.
  for (const auto& identifier : internal_params.identifiers()) {
    std::shared_ptr<ResourceMonitor> resource_monitor;
    {
      absl::MutexLock lock(resource_monitors_mutex_);
      auto it = resource_monitors_.find(identifier);
      if (it == resource_monitors_.end()) {
        resource_monitor =
            std::make_shared<ResourceMonitor>(identifier, data_source_.get());
        resource_monitors_[identifier] = resource_monitor;
      } else {
        resource_monitor = it->second;
      }
    }
    ECCLESIA_RETURN_IF_ERROR(resource_monitor->AddSubscriber(active_sub));
  }

  // Add the subscription to the active subscriptions map.
  {
    absl::MutexLock lock(active_subscriptions_mutex_);
    active_subscriptions_[sub_id] = active_sub;
  }
  active_sub->Begin();
  return sub_id;
}

absl::Status SubscriptionManagerImpl::Unsubscribe(
    const SubscriptionId& sub_id) {
  {
    absl::MutexLock lock(active_subscriptions_mutex_);
    auto it = active_subscriptions_.find(sub_id);
    if (it == active_subscriptions_.end()) {
      return absl::NotFoundError(
          absl::StrCat("Subscription ", sub_id, " not found for unsubscribe."));
    }
    LOG(WARNING) << "Unsubscribing " << sub_id;
  }
  OnSubscriptionEnd(sub_id, absl::CancelledError("Client unsubscribed."));
  return absl::OkStatus();
}

SubscriptionManagerImpl::SubscriptionId
SubscriptionManagerImpl::GenerateSubscriptionId() {
  return "sub_" + std::to_string(next_subscription_id_++);
}

nlohmann::json SubscriptionManagerImpl::ToJson() const {
  nlohmann::json json;
  json["next_subscription_id"] = next_subscription_id_.load();
  json["task_scheduler"] = task_scheduler_->ToJson();
  json["data_source"] = data_source_->GetName();
  {
    absl::MutexLock lock(active_subscriptions_mutex_);
    for (const auto& [id, sub] : active_subscriptions_) {
      json["active_subscriptions"].push_back(sub->ToJson());
    }
  }
  json["resource_monitors"] = nlohmann::json::array();
  {
    absl::MutexLock lock(resource_monitors_mutex_);
    for (const auto& [identifier, resource_monitor] : resource_monitors_) {
      json["resource_monitors"].push_back(resource_monitor->ToJson());
    }
  }
  return json;
}

}  // namespace milotic_hft
