#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>

#include "tlbmc/entity.pb.h"
#include "tlbmc/feed_client_interface.h"
#include "tlbmc/service_messages.pb.h"
#include "one/offline_node_entities.pb.h"
#include "one/resolved_entities.pb.h"
#include "one/public_offline_node_entities.h"
#include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/btree_set.h"
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "nlohmann/json_fwd.hpp"
#include "tlbmc/adapter/data_source.h"
#include "tlbmc/subscription/manager.h"
#include "identifier.pb.h"
#include "payload.pb.h"
#include "sensor_identifier.pb.h"
#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
#include "tlbmc/scheduler/scheduler.h"

namespace milotic_hft {

struct IdentifierHash {
  size_t operator()(const milotic_hft::Identifier& identifier) const {
    return absl::HashOf(absl::StrCat(identifier));
  }
};

struct IdentifierEqual {
  bool operator()(const milotic_hft::Identifier& a,
                  const milotic_hft::Identifier& b) const {
    return absl::StrCat(a) == absl::StrCat(b);
  }
};

// Forward declaration of ActiveSubscription.
class ActiveSubscription;

// Manages the sampling interval of a resource.
// It tracks the sampling interval of a resource and the subscribers
// of the resource. When a new subscriber is added, the effective sampling
// interval of the resource is updated to be the minimum of the sampling
// intervals of all subscribers.
// When a subscriber is removed, the effective sampling interval is updated to
// be the minimum of the sampling intervals of the remaining subscribers.
class ResourceMonitor {
 public:
  using SubscriptionId = SubscriptionManager::SubscriptionId;

  explicit ResourceMonitor(const milotic_hft::Identifier& identifier,
                           DataSource* data_source)
      : identifier_(identifier),
        current_sampling_interval_ms_(0),
        data_source_(data_source) {}

  const milotic_hft::Identifier& GetIdentifier() const { return identifier_; }

  absl::Status AddSubscriber(
      const std::shared_ptr<ActiveSubscription>& subscription)
      ABSL_LOCKS_EXCLUDED(subscribers_mutex_);

  absl::Status RemoveSubscriber(const SubscriptionId& id)
      ABSL_LOCKS_EXCLUDED(subscribers_mutex_);

  absl::Status ConfigureDataSource() ABSL_LOCKS_EXCLUDED(subscribers_mutex_);

  int GetCurrentSamplingIntervalMs() const {
    return current_sampling_interval_ms_;
  }

  nlohmann::json ToJson() const;

 protected:
  const milotic_hft::Identifier identifier_;
  std::atomic<int> current_sampling_interval_ms_;
  std::atomic<int> current_max_batch_size_;
  mutable absl::Mutex subscribers_mutex_;
  absl::flat_hash_map<SubscriptionManager::SubscriptionId,
                      std::shared_ptr<ActiveSubscription>>
      subscribers_ ABSL_GUARDED_BY(subscribers_mutex_);
  absl::btree_multiset<int> lowest_sampling_interval_subscriptions_
      ABSL_GUARDED_BY(subscribers_mutex_);
  DataSource* data_source_;
  absl::btree_multiset<int, std::greater<>> max_batch_sizes_
      ABSL_GUARDED_BY(subscribers_mutex_);
};

class ActiveSubscription
    : public std::enable_shared_from_this<ActiveSubscription> {
 public:
  using SubscriptionId = SubscriptionManager::SubscriptionId;

  ActiveSubscription(
      const SubscriptionId& id, const milotic_hft::SubscriptionParams& params,
      absl::AnyInvocable<void(Payload&&)> on_data_callback,
      absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
          on_subscription_ended,
      milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source);

  ~ActiveSubscription() {
    if (task_scheduler_ != nullptr) {
      task_scheduler_->Cancel(task_id_);
    }
    LOG(WARNING) << "ActiveSubscription " << id_ << " destroyed.";
  }

  void Begin();

  SubscriptionId GetId() const { return id_; }

  const milotic_hft::SubscriptionParams& GetParams() const { return params_; }

  void DeliverData(Payload&& data);

  bool IsActive() const { return batches_remaining_ != 0; }

  void OnSubscriptionEnd(const SubscriptionId& id, const absl::Status& status) {
    on_subscription_ended_callback_(id, status);
  }

  nlohmann::json ToJson() const;

 private:
  std::atomic<int> task_id_ = -1;  // -1 means no task is scheduled.
  const SubscriptionId id_;
  const milotic_hft::SubscriptionParams params_;
  absl::AnyInvocable<void(Payload&&)> on_data_callback_;
  absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
      on_subscription_ended_callback_;
  std::atomic<int> batches_remaining_ = -1;  // -1 means unlimited.
  absl::flat_hash_map<milotic_hft::Identifier, absl::Time, IdentifierHash,
                      IdentifierEqual>
      identifier_to_last_sampled_time_
          ABSL_GUARDED_BY(identifier_to_last_sampled_time_ms_mutex_);
  mutable absl::Mutex identifier_to_last_sampled_time_ms_mutex_;
  milotic_tlbmc::TaskScheduler* task_scheduler_;
  DataSource* data_source_;
};

class SubscriptionManagerImpl : public SubscriptionManager {
 public:
  using SubscriptionId = SubscriptionManager::SubscriptionId;

  struct Options {
    absl::Duration get_policies_interval = absl::Minutes(15);
    platforms_syshealth::collection::feed::EndpointIdentifier::EndpointType
        endpoint_type ABSL_REQUIRE_EXPLICIT_INIT;
    std::string offline_node_entities_path;
    bool get_policies_on_startup = true;
  };

  ~SubscriptionManagerImpl() override;

  // Creates a subscription manager.
  static std::unique_ptr<SubscriptionManagerImpl> Create(
      std::unique_ptr<DataSource> data_source) {
    return absl::WrapUnique(
        new SubscriptionManagerImpl(std::move(data_source)));
  }

  // Creates a subscription manager with feed client.
  static std::unique_ptr<SubscriptionManagerImpl> Create(
      std::unique_ptr<DataSource> data_source,
      platforms_syshealth::collection::feed::FeedClientInterface* feed_client,
      const Options& options) {
    platforms_syshealth::collection::feed::Entity entity;
    entity.set_type(
        platforms_syshealth::collection::feed::Entity::MACHINE_NODE);
    absl::StatusOr<
        production_msv::node_entities_proto::OfflineNodeEntityInformation>
        node_entity_info;
    if (!options.offline_node_entities_path.empty()) {
      node_entity_info =
          production_msv::node_entities::ReadOfflineNodeEntityInformation(
              options.offline_node_entities_path);
    } else {
      node_entity_info =
          production_msv::node_entities::ReadOfflineNodeEntityInformation();
    }
    if (!node_entity_info.ok()) {
      LOG(ERROR) << "Failed to read offline node entity information: "
                 << node_entity_info.status();
      return nullptr;
    }
    entity.set_type(
        platforms_syshealth::collection::feed::Entity::MACHINE_NODE);
    entity.mutable_machine_node_entity()->set_machine_name(
        node_entity_info->resolved_config().machine_name());
    entity.mutable_machine_node_entity()->set_node_entity_tag(
        node_entity_info->entity_tag());
    if (node_entity_info->resolved_config().entities().contains(
            node_entity_info->entity_tag())) {
      entity.mutable_machine_node_entity()->set_hostname(
          node_entity_info->resolved_config()
              .entities()
              .at(node_entity_info->entity_tag())
              .hostname());
      entity.mutable_metadata()->set_hostname(
          node_entity_info->resolved_config()
              .entities()
              .at(node_entity_info->entity_tag())
              .hostname());
    }
    return absl::WrapUnique(new SubscriptionManagerImpl(
        std::move(data_source), feed_client, options, entity));
  }

  absl::StatusOr<SubscriptionId> AddSubscription(
      const SubscriptionParams& params,
      absl::AnyInvocable<void(Payload&&)> on_data_callback) override
      ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_);

  absl::Status Unsubscribe(const SubscriptionId& sub_id) override
      ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_);

  nlohmann::json ToJson() const override;

  absl::Status StartPolicyPolling();

 private:
  explicit SubscriptionManagerImpl(std::unique_ptr<DataSource> data_source);
  SubscriptionManagerImpl(
      std::unique_ptr<DataSource> data_source,
      platforms_syshealth::collection::feed::FeedClientInterface* feed_client,
      const Options& options,
      const platforms_syshealth::collection::feed::Entity& entity);

  void PullAgentPoliciesAndUpdateSubscriptions();
  SubscriptionId GenerateSubscriptionId();
  void OnSubscriptionEnd(const SubscriptionId& subscription_id,
                         const absl::Status& status)
      ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_,
                          resource_monitors_mutex_);

  std::atomic<size_t> next_subscription_id_;
  std::unique_ptr<milotic_tlbmc::TaskScheduler> task_scheduler_;
  mutable absl::Mutex active_subscriptions_mutex_;
  absl::flat_hash_map<SubscriptionId, std::shared_ptr<ActiveSubscription>>
      active_subscriptions_ ABSL_GUARDED_BY(active_subscriptions_mutex_);
  std::unique_ptr<DataSource> data_source_;

  mutable absl::Mutex resource_monitors_mutex_;
  absl::flat_hash_map<milotic_hft::Identifier, std::shared_ptr<ResourceMonitor>,
                      IdentifierHash, IdentifierEqual>
      resource_monitors_ ABSL_GUARDED_BY(resource_monitors_mutex_);

  platforms_syshealth::collection::feed::FeedClientInterface* feed_client_ =
      nullptr;
  absl::Duration get_policies_interval_;
  std::atomic<int> get_policies_task_id_ = -1;
  absl::flat_hash_map<std::string, SubscriptionId> policy_to_subscription_ids_
      ABSL_GUARDED_BY(active_subscriptions_mutex_);
  absl::flat_hash_map<SubscriptionId, std::string> subscription_to_policy_ids_
      ABSL_GUARDED_BY(active_subscriptions_mutex_);
  const platforms_syshealth::collection::feed::EndpointIdentifier::EndpointType
      endpoint_type_ = platforms_syshealth::collection::feed::
          EndpointIdentifier::ENDPOINT_TYPE_UNSPECIFIED;
  platforms_syshealth::collection::feed::Entity entity_;
};

}  // namespace milotic_hft

#endif  // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_
