#include "tlbmc/collector/sensor_collector.h"

#include <algorithm>
#include <climits>
#include <cstddef>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "boost/asio.hpp"  //NOLINT: boost::asio is commonly used in BMC
#include "g3/macros.h"
#include "thread/thread.h"
#include "nlohmann/json.hpp"
#include "tlbmc/configs/entity_config.h"
#include "fan_controller_config.pb.h"
#include "fan_pwm_config.pb.h"
#include "fan_tach_config.pb.h"
#include "hwmon_temp_sensor_config.pb.h"
#include "psu_sensor_config.pb.h"
#include "shared_mem_sensor_config.pb.h"
#include "resource.pb.h"
#include "sensor.pb.h"
#include "tlbmc/sensors/fan_controller.h"
#include "tlbmc/sensors/fan_pwm.h"
#include "tlbmc/sensors/fan_tach.h"
#include "tlbmc/sensors/hwmon_temp_sensor.h"
#include "tlbmc/sensors/psu_sensor.h"
#include "tlbmc/sensors/sensor.h"
#include "tlbmc/sensors/shared_mem_based_sensor.h"
#include "tlbmc/time/time.h"
#include "google/protobuf/json/json.h"
#include "google/protobuf/util/json_util.h"

constexpr absl::Duration kDefaultSensorSamplingInterval =
    absl::Milliseconds(1000);

namespace milotic_tlbmc {

namespace {

constexpr absl::string_view kDefaultHwmonContext = "DEFAULT_HWMON_CONTEXT";
constexpr absl::string_view kDefaultPsuContext = "DEFAULT_PSU_CONTEXT";
constexpr absl::string_view kDefaultFanContext = "DEFAULT_FAN_CONTEXT";

void ResizeBufferAndMetrics(const std::shared_ptr<Sensor>& sensor,
                            size_t buffer_size_from_config) {
  if (buffer_size_from_config == INT_MIN) {
    return;
  }

  size_t default_buffer_size =
      sensor->GetSensorAttributesStatic().entity_common_config().queue_size();
  if (buffer_size_from_config > default_buffer_size) {
    sensor->ResizeBuffer(buffer_size_from_config);
  } else {
    // If the buffer size from the config is less than the default buffer size,
    // we need to resize the buffer to the default buffer size.
    // This covers both the cases where we need to reset the buffer size to the
    // default value and when a lower than default size is specified in the
    // config.
    sensor->ResizeBuffer(default_buffer_size);
  }
  sensor->ResetMetrics();
}

absl::Status ScheduleSensorRead(
    const std::vector<std::shared_ptr<Sensor>>& sensors,
    const SensorCollector::Params& params, ThreadManager& thread_manager) {
  LOG(INFO) << "Scheduling Sensor Read! Total sensor count is "
            << sensors.size();
  absl::flat_hash_map<absl::Duration, std::vector<std::weak_ptr<Sensor>>>
      sensors_by_interval;
  for (const auto& sensor : sensors) {
    absl::Duration interval = kDefaultSensorSamplingInterval;
    if (params.override_sensor_sampling_interval_ms.has_value()) {
      LOG(INFO) << "Overriding sensor sampling interval to "
                << *params.override_sensor_sampling_interval_ms;
      interval =
          absl::Milliseconds(*params.override_sensor_sampling_interval_ms);
    } else {
      absl::Duration static_refresh_interval = DecodeGoogleApiProto(
          sensor->GetSensorAttributesStatic().static_refresh_interval());
      if (static_refresh_interval > absl::ZeroDuration()) {
        interval = static_refresh_interval;
      }
    }

    Status status = sensor->GetSensorAttributesDynamic().state().status();
    if (status == STATUS_CREATION_FAILED || status == STATUS_CREATION_PENDING) {
      LOG(INFO) << "Skipping schedule sensor read for " << sensor->GetKey()
                << " because sensor initialization failed. Status: " << status;
      continue;
    }
    LOG(INFO) << "Scheduling Sensor Read! Sensor key is " << sensor->GetKey()
              << " and interval is " << interval;

    thread_manager.sensor_key_to_task_id[sensor->GetKey()] =
        thread_manager.task_scheduler->RunAndScheduleAsync(
            [sensor = std::weak_ptr<Sensor>(sensor),
             refresh_notification = params.refresh_notification](
                absl::AnyInvocable<void()> on_done) {
              std::shared_ptr<Sensor> sensor_locked = sensor.lock();
              if (!sensor_locked) {
                return;
              }

              sensor_locked->RefreshOnceAsync(
                  [refresh_notification = refresh_notification,
                   on_done = std::move(on_done)](
                      const std::shared_ptr<const SensorValue>&
                          sensor_data) mutable {
                    if (refresh_notification != nullptr) {
                      refresh_notification->NotifyWithData(sensor_data);
                    }
                    on_done();
                  });
            },
            interval);
  }
  LOG(INFO) << "Schedule done!";
  return absl::OkStatus();
}

// Returns the io_context for the given sensor group. If the io_context does not
// exist, it will be created. SensorGroup should be a unique identifier for
// sensors that share the same i2c device or are otherwise related to optimize
// reading speed for HFT and minimize contention over the bus mutex. See
// b/434024387 for details.
std::shared_ptr<boost::asio::io_context> GetOrCreateIoContextForSensorGroup(
    absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
        sensor_group_to_io_context,
    absl::string_view sensor_group) {
  auto& group_io_context = sensor_group_to_io_context[sensor_group];
  if (group_io_context == nullptr) {
    group_io_context = std::make_shared<boost::asio::io_context>();
  }
  return group_io_context;
}

absl::Status CreateHwmonSensors(
    const SensorCollector::Params& params,
    std::vector<std::shared_ptr<Sensor>>& sensors,
    absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
        sensor_group_to_io_context) {
  ECCLESIA_ASSIGN_OR_RETURN(
      absl::Span<const HwmonTempSensorConfig> hwmon_temp_sensor_configs,
      params.entity_config.GetAllHwmonTempSensorConfigs());
  std::vector<std::shared_ptr<Sensor>> hwmon_temp_sensors;
  size_t count_sensors = 0;
  for (const auto& config : hwmon_temp_sensor_configs) {
    absl::string_view sensor_group = config.has_sensor_group()
                                         ? config.sensor_group()
                                         : kDefaultHwmonContext;
    ECCLESIA_ASSIGN_OR_RETURN(
        hwmon_temp_sensors,
        HwmonTempSensor::Create(config,
                                GetOrCreateIoContextForSensorGroup(
                                    sensor_group_to_io_context, sensor_group),
                                params.i2c_sysfs));
    // We want to use syslog to track device creation status
    LOG(INFO) << absl::Substitute("Created $0 HWmon sensors at $1",
                                  hwmon_temp_sensors.size(),
                                  config.i2c_common_config());
    count_sensors += hwmon_temp_sensors.size();
    sensors.insert(sensors.end(), hwmon_temp_sensors.begin(),
                   hwmon_temp_sensors.end());
  }
  return absl::OkStatus();
}

absl::Status CreatePsuSensors(
    const SensorCollector::Params& params,
    std::vector<std::shared_ptr<Sensor>>& sensors,
    absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
        sensor_group_to_io_context) {
  ECCLESIA_ASSIGN_OR_RETURN(
      absl::Span<const PsuSensorConfig> psu_sensor_configs,
      params.entity_config.GetAllPsuSensorConfigs());
  size_t count_sensors = 0;
  for (const auto& config : psu_sensor_configs) {
    absl::string_view sensor_group =
        config.has_sensor_group() ? config.sensor_group() : kDefaultPsuContext;
    ECCLESIA_ASSIGN_OR_RETURN(
        std::vector<std::shared_ptr<Sensor>> psu_sensors,
        PsuSensor::Create(config,
                          GetOrCreateIoContextForSensorGroup(
                              sensor_group_to_io_context, sensor_group),
                          params.i2c_sysfs));
    // We want to use syslog to track device creation status
    LOG(INFO) << absl::Substitute("Created $0 PSU sensors at $1",
                                  psu_sensors.size(),
                                  config.i2c_common_config());
    count_sensors += psu_sensors.size();
    sensors.insert(sensors.end(), psu_sensors.begin(), psu_sensors.end());
  }
  return absl::OkStatus();
}

absl::Status CreateFanSensors(
    const SensorCollector::Params& params,
    absl::Span<const std::shared_ptr<FanController>> fan_controllers,
    std::vector<std::shared_ptr<Sensor>>& sensors,
    absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
        sensor_group_to_io_context) {
  ECCLESIA_ASSIGN_OR_RETURN(absl::Span<const FanPwmConfig> fan_pwm_configs,
                            params.entity_config.GetAllFanPwmConfigs());

  int count = 0;
  for (const auto& config : fan_pwm_configs) {
    for (const auto& fan_controller : fan_controllers) {
      if (!fan_controller->ControllerHasSensor(config.i2c_common_config())) {
        continue;
      }
      absl::string_view sensor_group = config.has_sensor_group()
                                           ? config.sensor_group()
                                           : kDefaultFanContext;
      ECCLESIA_ASSIGN_OR_RETURN(
          std::shared_ptr<Sensor> fan_pwm,
          FanPwm::Create(config, *fan_controller,
                         GetOrCreateIoContextForSensorGroup(
                             sensor_group_to_io_context, sensor_group),
                         params.i2c_sysfs));
      sensors.push_back(std::move(fan_pwm));
      count++;
      break;
    }
  }
  ECCLESIA_ASSIGN_OR_RETURN(absl::Span<const FanTachConfig> fan_tach_configs,
                            params.entity_config.GetAllFanTachConfigs());
  for (const auto& config : fan_tach_configs) {
    for (const auto& fan_controller : fan_controllers) {
      if (!fan_controller->ControllerHasSensor(config.i2c_common_config())) {
        continue;
      }
      absl::string_view sensor_group = config.has_sensor_group()
                                           ? config.sensor_group()
                                           : kDefaultFanContext;
      ECCLESIA_ASSIGN_OR_RETURN(
          std::shared_ptr<Sensor> fan_tach,
          FanTachometer::Create(config, *fan_controller,
                                GetOrCreateIoContextForSensorGroup(
                                    sensor_group_to_io_context, sensor_group),
                                params.i2c_sysfs));
      sensors.push_back(std::move(fan_tach));
      count++;
      break;
    }
  }
  // We want to use syslog to track device creation status
  LOG(INFO) << absl::Substitute("Created $0 Fan PWM/Tach sensors", count);
  return absl::OkStatus();
}

absl::Status CreateSharedMemSensors(
    const SensorCollector::Params& params,
    std::vector<std::shared_ptr<Sensor>>& sensors,
    ThreadManager& thread_manager) {
  ECCLESIA_ASSIGN_OR_RETURN(
      absl::Span<const SharedMemSensorConfig> shared_mem_sensor_configs,
      params.entity_config.GetAllSharedMemSensorConfigs());
  auto io_context = std::make_shared<boost::asio::io_context>();
  boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
      work_guard(boost::asio::make_work_guard(*io_context));
  int count = 0;
  for (const auto& config : shared_mem_sensor_configs) {
    ECCLESIA_ASSIGN_OR_RETURN(std::shared_ptr<Sensor> shared_mem_sensor,
                              SharedMemBasedSensor::Create(config, io_context));
    // We want to use syslog to track device creation status
    LOG(INFO) << absl::Substitute("Created SharedMem sensor: $0",
                                  config.name());
    sensors.push_back(std::move(shared_mem_sensor));
    count++;
  }
  if (count > 0) {
    thread_manager.threads.push_back(
        params.thread_factory->New([io_context]() { io_context->run(); }));
    thread_manager.work_guards.push_back(std::move(work_guard));
    thread_manager.io_contexts.push_back(std::move(io_context));
  }
  return absl::OkStatus();
}

absl::StatusOr<std::vector<std::shared_ptr<FanController>>>
CreateFanControllers(const SensorCollector::Params& params) {
  ECCLESIA_ASSIGN_OR_RETURN(
      absl::Span<const FanControllerConfig> fan_controller_configs,
      params.entity_config.GetAllFanControllerConfigs());
  std::vector<std::shared_ptr<FanController>> fan_controllers;
  for (const auto& config : fan_controller_configs) {
    ECCLESIA_ASSIGN_OR_RETURN(std::shared_ptr<FanController> fan_controller,
                              FanController::Create(config, params.i2c_sysfs));
    // We want to use syslog to track device creation status
    LOG(INFO) << absl::Substitute("Created 1 fan controller at $0",
                                  config.i2c_common_config());
    fan_controllers.push_back(std::move(fan_controller));
  }
  return fan_controllers;
}

absl::flat_hash_map<std::string,
                    absl::flat_hash_map<std::string, std::shared_ptr<Sensor>>>
CreateSensorsTable(std::vector<std::shared_ptr<Sensor>>&& sensors) {
  absl::flat_hash_map<std::string,
                      absl::flat_hash_map<std::string, std::shared_ptr<Sensor>>>
      sensor_table;
  for (std::shared_ptr<Sensor>& sensor : sensors) {
    sensor_table[sensor->GetConfigName()][sensor->GetKey()] = std::move(sensor);
  }
  return sensor_table;
}

}  // namespace

ThreadManager::~ThreadManager() {
  // Stop the scheduler first.
  task_scheduler->Stop();

  // Stop io_contexts.
  for (const std::shared_ptr<boost::asio::io_context>& io_context :
       io_contexts) {
    io_context->stop();
  }

  // Finally join all threads.
  for (const std::unique_ptr<ecclesia::ThreadInterface>& thread : threads) {
    thread->Join();
  }
}

nlohmann::json SensorCollector::ToJson() const {
  nlohmann::json::object_t response;
  for (const auto& [board_config_name, key_to_sensor] : sensor_table_) {
    nlohmann::json::object_t sensors;
    for (const auto& [key, sensor] : key_to_sensor) {
      std::string json_string;
      ::google::protobuf::util::JsonPrintOptions opts;
      opts.preserve_proto_field_names = true;
      std::shared_ptr<const SensorValue> sensor_data = sensor->GetSensorData();
      if (sensor_data == nullptr ||
          !::google::protobuf::json::MessageToJsonString(*sensor_data, &json_string, opts)
               .ok()) {
        LOG(ERROR) << "Failed to get sensor data for " << key;
        continue;
      }
      nlohmann::json& sensor_json = sensors[key];
      sensor_json["Value"] = nlohmann::json::parse(json_string, nullptr, false);
      json_string.clear();

      if (!::google::protobuf::json::MessageToJsonString(sensor->GetSensorMetrics(),
                                               &json_string, opts)
               .ok()) {
        LOG(ERROR) << "Failed to get sensor metrics for " << key;
        continue;
      }
      sensor_json["Metrics"] =
          nlohmann::json::parse(json_string, nullptr, false);
      json_string.clear();

      if (!::google::protobuf::json::MessageToJsonString(
               sensor->GetSensorAttributesStatic(), &json_string, opts)
               .ok()) {
        LOG(ERROR) << "Failed to convert SensorStaticAttributes to JSON";
        continue;
      }
      sensor_json["StaticAttributes"] =
          nlohmann::json::parse(json_string, nullptr, false);

      json_string.clear();
      if (!::google::protobuf::json::MessageToJsonString(
               sensor->GetSensorAttributesDynamic(), &json_string, opts)
               .ok()) {
        LOG(ERROR) << "Failed to convert SensorDynamicAttributes to JSON";
        continue;
      }
      sensor_json["DynamicAttributes"] =
          nlohmann::json::parse(json_string, nullptr, false);
    }
    response[board_config_name] = sensors;
  }
  return response;
}

SensorCollector::SensorCollector(std::vector<std::shared_ptr<Sensor>>&& sensors,
                                 std::unique_ptr<ThreadManager> thread_manager,
                                 const SensorNotification* refresh_notification)
    : sensor_table_(CreateSensorsTable(std::move(sensors))),
      thread_manager_(std::move(thread_manager)),
      refresh_notification_(refresh_notification) {}

std::vector<std::string> SensorCollector::GetAllSensorKeysByConfigName(
    const std::string& board_config_name) const {
  std::vector<std::string> sensor_keys;
  if (auto it = sensor_table_.find(board_config_name);
      it != sensor_table_.end()) {
    for (const auto& [key, _] : it->second) {
      sensor_keys.push_back(key);
    }
  }
  std::sort(sensor_keys.begin(), sensor_keys.end());
  return sensor_keys;
}

std::shared_ptr<const Sensor>
SensorCollector::GetSensorByConfigNameAndSensorKey(
    const std::string& board_config_name, const std::string& sensor_key) const {
  auto board_it = sensor_table_.find(board_config_name);
  if (board_it == sensor_table_.end()) {
    return nullptr;
  }
  auto sensor_it = board_it->second.find(sensor_key);
  if (sensor_it == board_it->second.end()) {
    return nullptr;
  }
  return sensor_it->second;
}

std::vector<std::shared_ptr<const Sensor>> SensorCollector::GetAllSensors()
    const {
  std::vector<std::shared_ptr<const Sensor>> all_sensors;
  for (const auto& [_, key_to_sensor] : sensor_table_) {
    for (const auto& [_, sensor] : key_to_sensor) {
      all_sensors.push_back(sensor);
    }
  }
  std::sort(all_sensors.begin(), all_sensors.end(),
            [](const std::shared_ptr<const Sensor>& a,
               const std::shared_ptr<const Sensor>& b) {
              return a->GetKey() < b->GetKey();
            });
  return all_sensors;
}

std::shared_ptr<const Sensor> SensorCollector::GetSensorBySensorKey(
    const std::string& sensor_key) const {
  for (const auto& [_, key_to_sensor] : sensor_table_) {
    auto it = key_to_sensor.find(sensor_key);
    if (it != key_to_sensor.end()) {
      return it->second;
    }
  }
  return nullptr;
}

absl::Status SensorCollector::ConfigureCollection(const Config& config) const {
  auto get_sampling_interval = [](const std::shared_ptr<Sensor>& sensor,
                                  const Config& config) {
    if (config.sampling_interval_ms > 0) {
      return absl::Milliseconds(config.sampling_interval_ms);
    }

    // If the sampling interval is not set in the config, then we use the static
    // refresh interval of the sensor if it is set. Otherwise, we use the
    // default sensor sampling interval.
    absl::Duration static_refresh_interval = DecodeGoogleApiProto(
        sensor->GetSensorAttributesStatic().static_refresh_interval());
    if (static_refresh_interval > absl::ZeroDuration()) {
      LOG(WARNING) << "Using static refresh interval: "
                   << static_refresh_interval
                   << " for sensor: " << sensor->GetKey();
      return static_refresh_interval;
    }
    LOG(WARNING) << "Using default sensor sampling interval: "
                 << kDefaultSensorSamplingInterval
                 << " for sensor: " << sensor->GetKey();
    return kDefaultSensorSamplingInterval;
  };

  bool any_sensor_configured = false;
  for (auto& [_, key_to_sensor] : sensor_table_) {
    // If the key is empty, then we need to configure all the sensors in the
    // board config.
    if (config.key.empty()) {
      for (auto& [key, sensor] : key_to_sensor) {
        absl::Duration interval = get_sampling_interval(sensor, config);
        ResizeBufferAndMetrics(sensor, config.max_batch_size);
        // It's assumed sensor_key_to_task_id always contains the key if it's in
        // sensor_table_
        thread_manager_->task_scheduler->UpdateTaskPeriod(
            thread_manager_->sensor_key_to_task_id.at(key), interval);
        sensor->UpdateSensorSamplingInterval(interval);
        any_sensor_configured = true;
      }
    }

    // If the key is not empty, then we need to configure the sensor with the
    // given key.
    auto it = key_to_sensor.find(config.key);
    if (it != key_to_sensor.end()) {
      absl::Duration interval = get_sampling_interval(it->second, config);
      thread_manager_->task_scheduler->UpdateTaskPeriod(
          thread_manager_->sensor_key_to_task_id.at(config.key), interval);
      ResizeBufferAndMetrics(it->second, config.max_batch_size);
      it->second->UpdateSensorSamplingInterval(interval);
      any_sensor_configured = true;
      break;
    }
  }

  return any_sensor_configured
             ? absl::OkStatus()
             : absl::NotFoundError("No sensor found for configuration");
}

absl::StatusOr<std::unique_ptr<SensorCollector>> SensorCollector::Create(
    const Params& params) {
  std::vector<std::shared_ptr<Sensor>> sensors;
  auto thread_manager = std::make_unique<ThreadManager>(params.clock);
  absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>
      sensor_group_to_io_context;

  ECCLESIA_RETURN_IF_ERROR(
      CreateHwmonSensors(params, sensors, sensor_group_to_io_context));

  ECCLESIA_RETURN_IF_ERROR(
      CreatePsuSensors(params, sensors, sensor_group_to_io_context));

  ECCLESIA_ASSIGN_OR_RETURN(
      std::vector<std::shared_ptr<FanController>> fan_controllers,
      CreateFanControllers(params));

  ECCLESIA_RETURN_IF_ERROR(CreateFanSensors(params, fan_controllers, sensors,
                                            sensor_group_to_io_context));

  ECCLESIA_RETURN_IF_ERROR(
      CreateSharedMemSensors(params, sensors, *thread_manager));

  for (const auto& [sensor_group, io_context] : sensor_group_to_io_context) {
    LOG(INFO) << "Starting io_context thread for sensor group: "
              << sensor_group;
    boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
        work_guard(boost::asio::make_work_guard(*io_context));
    thread_manager->work_guards.push_back(std::move(work_guard));
    thread_manager->io_contexts.push_back(io_context);
    thread_manager->threads.push_back(
        params.thread_factory->New([io_context]() { io_context->run(); }));
  }

  ECCLESIA_RETURN_IF_ERROR(
      ScheduleSensorRead(sensors, params, *thread_manager));

  return absl::WrapUnique(new SensorCollector(std::move(sensors),
                                              std::move(thread_manager),
                                              params.refresh_notification));
}

std::unique_ptr<EmptySensorCollector> EmptySensorCollector::Create() {
  return std::make_unique<EmptySensorCollector>();
}

// Returns the sorted list of sensor names contained by the given Config name.
std::vector<std::string> EmptySensorCollector::GetAllSensorKeysByConfigName(
    const std::string& board_config_name) const {
  LOG(WARNING) << "EmptySensorCollector::GetAllSensorKeysByConfigName is "
                  "called. This will return an empty list.";
  return {};
}

// Returns all the sensors sorted by sensor name.
std::vector<std::shared_ptr<const Sensor>> EmptySensorCollector::GetAllSensors()
    const {
  LOG(WARNING) << "EmptySensorCollector::GetAllSensorKeysByConfigName is "
                  "called. This will return an empty list.";
  return {};
}

// Returns the sensor for the given sensor key.
std::shared_ptr<const Sensor> EmptySensorCollector::GetSensorBySensorKey(
    const std::string& sensor_key) const {
  LOG(WARNING) << "EmptySensorCollector::GetSensorBySensorKey is "
                  "called. This will return a nullptr.";
  return nullptr;
}

absl::Status EmptySensorCollector::ConfigureCollection(
    const Config& config) const {
  return absl::UnimplementedError(
      "EmptySensorCollector::ConfigureCollection is called. This is not "
      "implemented.");
}

std::shared_ptr<const Sensor>
EmptySensorCollector::GetSensorByConfigNameAndSensorKey(
    const std::string& board_config_name, const std::string& sensor_key) const {
  LOG(WARNING) << "EmptySensorCollector::GetSensorByConfigNameAndSensorKey is "
                  "called. This will return a nullptr.";
  return nullptr;
}

nlohmann::json EmptySensorCollector::GetSchedulerStats() const {
  return nlohmann::json::parse("{\"Warning\": \"EmptySensorCollector used.\"}");
}

nlohmann::json EmptySensorCollector::ToJson() const {
  return nlohmann::json::parse("{\"Warning\": \"EmptySensorCollector used.\"}");
}

}  // namespace milotic_tlbmc
