#include "tlbmc/sensors/sensor.h"

#include <algorithm>
#include <cstdint>
#include <iterator>
#include <memory>
#include <optional>
#include <utility>
#include <vector>

#include "google/protobuf/duration.pb.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "g3/macros.h"
#include "entity_common_config.pb.h"
#include "reading_range_config.pb.h"
#include "tlbmc/configs/subscription_config.h"
#include "threshold_config.pb.h"
#include "resource.pb.h"
#include "sensor.pb.h"

namespace milotic_tlbmc {

namespace {

google::protobuf::Duration GetDurationFromNanoseconds(uint64_t nanoseconds) {
  google::protobuf::Duration duration;
  duration.set_seconds(static_cast<int64_t>(nanoseconds / 1000000000));
  duration.set_nanos(static_cast<int32_t>(nanoseconds % 1000000000));
  return duration;
}

}  // namespace

SensorAttributesStatic Sensor::CreateStaticAttributes(
    absl::string_view sensor_name, const SensorUnit &sensor_unit,
    const EntityCommonConfig &entity_common_config,
    const ThresholdConfigs &threshold_configs,
    const ReadingRangeConfigs &reading_range_configs) {
  SensorAttributesStatic sensor_attributes_static;
  sensor_attributes_static.set_unit(sensor_unit);
  sensor_attributes_static.mutable_attributes()->set_key(sensor_name);
  sensor_attributes_static.mutable_attributes()
      ->mutable_refresh_policy()
      ->set_refresh_mode(REFRESH_MODE_PERIODIC);
  *sensor_attributes_static.mutable_attributes()
       ->mutable_refresh_policy()
       ->mutable_refresh_interval() = entity_common_config.refresh_interval();
  *sensor_attributes_static.mutable_entity_common_config() =
      entity_common_config;
  *sensor_attributes_static.mutable_thresholds() = threshold_configs;
  *sensor_attributes_static.mutable_reading_ranges() = reading_range_configs;
  *sensor_attributes_static.mutable_static_refresh_interval() =
      entity_common_config.refresh_interval();
  return sensor_attributes_static;
}

// Subscribes to the sensor data.
absl::Status Sensor::Subscribe(const SubscriptionParams *subscription_params) {
  ECCLESIA_ASSIGN_OR_RETURN(int batch_size,
                            subscription_params->GetBatchSizeForSubscription());

  uint64_t max_queue_size =
      sensor_attributes_static_.entity_common_config().queue_size();
  if (batch_size > max_queue_size) {
    return absl::InvalidArgumentError(absl::Substitute(
        "Max queue size $0 is smaller than the requested batch size $1",
        max_queue_size, batch_size));
  }

  absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
  subscription_params_.insert(subscription_params);

  // If the new batch size is smallest, we need to reset the current batch size
  // used for the notification.
  if (sensor_attributes_dynamic_.data_points_to_notify() == 0 ||
      *subscription_params_.begin() == subscription_params) {
    sensor_attributes_dynamic_.set_data_points_to_notify(batch_size);

    // Set the refresh interval for the sensor based on the subscription
    // parameters.
    *sensor_attributes_dynamic_.mutable_refresh_interval() =
        GetDurationFromNanoseconds(subscription_params->sample_interval_ns);

    // Reset the current batch size. This is needed in case the new batch size
    // is smaller than the current one.
    sensor_attributes_dynamic_.set_data_points_buffered(0);
  }
  return absl::OkStatus();
}

// Deletes an existing subscription.
absl::Status Sensor::Unsubscribe(
    const SubscriptionParams *subscription_params) {
  absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
  if (subscription_params_.empty()) {
    return absl::NotFoundError("No subscriptions to unsubscribe from.");
  }
  subscription_params_.erase(subscription_params);
  if (subscription_params_.empty()) {
    sensor_attributes_dynamic_.set_data_points_to_notify(0);
    return absl::OkStatus();
  }

  ECCLESIA_ASSIGN_OR_RETURN(
      int batch_size,
      (*subscription_params_.begin())->GetBatchSizeForSubscription());
  sensor_attributes_dynamic_.set_data_points_to_notify(batch_size);
  return absl::OkStatus();
}

void Sensor::StoreSensorData(
    const std::shared_ptr<const SensorValue> &sensor_data) {
  if (notification_cb_ == std::nullopt) {
    int data_points_buffered = 0;
    {
      absl::MutexLock lock(&sensor_data_mutex_);
      sensor_data_.push_back(sensor_data);
      data_points_buffered = static_cast<int>(sensor_data_.size());
    }
    {
      absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
      sensor_attributes_dynamic_.set_data_points_buffered(data_points_buffered);
    }
    return;
  }
  int data_points_to_notify = 0;
  bool should_notify = false;
  {
    absl::MutexLock sensor_attributes_dynamic_lock(
        &sensor_attributes_dynamic_mutex_);
    int data_points_buffered =
        sensor_attributes_dynamic_.data_points_buffered() + 1;
    data_points_to_notify = sensor_attributes_dynamic_.data_points_to_notify();
    should_notify = notification_cb_ != std::nullopt &&
                    data_points_to_notify == data_points_buffered;
    // If we should notify, reset the data points buffered
    should_notify ? sensor_attributes_dynamic_.set_data_points_buffered(0)
                  : sensor_attributes_dynamic_.set_data_points_buffered(
                        data_points_buffered);
  }

  std::vector<std::shared_ptr<const SensorValue>> sensor_batch;
  {
    absl::MutexLock lock(&sensor_data_mutex_);
    sensor_data_.push_back(sensor_data);
    if (!should_notify) {
      return;
    }
    // Get the last batch of data from the buffer.
    auto it = sensor_data_.end() - static_cast<int>(data_points_to_notify);
    // Prepare the batch to be sent to the callback.
    sensor_batch.reserve(data_points_to_notify);
    std::copy(it, sensor_data_.end(), std::back_inserter(sensor_batch));
  }
  (*notification_cb_)(std::move(sensor_batch));
}

void Sensor::UpdateState(State &&state) {
  absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
  *sensor_attributes_dynamic_.mutable_state() = std::move(state);
}

}  // namespace milotic_tlbmc
