blob: b9a3474c287e76e3d7d642f2a565de29a340498b [file] [log] [blame]
#include "tlbmc/sensors/sensor.h"
#include <algorithm>
#include <chrono> // NOLINT
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "google/protobuf/duration.pb.h"
#include "absl/base/thread_annotations.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "g3/macros.h"
#include "entity_common_config.pb.h"
#include "i2c_common_config.pb.h"
#include "reading_range_config.pb.h"
#include "reading_transform_config.pb.h"
#include "tlbmc/configs/subscription_config.h"
#include "threshold_config.pb.h"
#include "resource.pb.h"
#include "sensor.pb.h"
#include "tlbmc/time/time.h"
namespace milotic_tlbmc {
namespace {
google::protobuf::Duration GetDurationFromNanoseconds(uint64_t nanoseconds) {
google::protobuf::Duration duration;
duration.set_seconds(static_cast<int64_t>(nanoseconds / 1000000000));
duration.set_nanos(static_cast<int32_t>(nanoseconds % 1000000000));
return duration;
}
google::protobuf::Duration GetDurationFromMilliseconds(uint64_t milliseconds) {
google::protobuf::Duration duration;
duration.set_seconds(static_cast<int64_t>(milliseconds / 1000));
duration.set_nanos(static_cast<int32_t>(milliseconds % 1000) * 1000000);
return duration;
}
} // namespace
std::string GetTrimmedSensorName(absl::string_view sensor_name) {
std::string stripped_sensor_name;
if (uint64_t under_pos = sensor_name.find('_');
under_pos != std::string::npos) {
stripped_sensor_name = sensor_name.substr(under_pos + 1);
}
std::replace(stripped_sensor_name.begin(), stripped_sensor_name.end(), '_',
' ');
return stripped_sensor_name;
}
SensorAttributesStatic Sensor::CreateStaticAttributes(
absl::string_view sensor_name, const SensorUnit& sensor_unit,
const I2cCommonConfig& i2c_common_config,
const EntityCommonConfig& entity_common_config,
const ThresholdConfigs& threshold_configs,
const ReadingRangeConfigs& reading_range_configs,
const ReadingTransformConfig& reading_transform_config) {
SensorAttributesStatic sensor_attributes_static;
sensor_attributes_static.set_unit(sensor_unit);
sensor_attributes_static.mutable_attributes()->set_key(sensor_name);
sensor_attributes_static.mutable_attributes()
->mutable_refresh_policy()
->set_refresh_mode(REFRESH_MODE_PERIODIC);
*sensor_attributes_static.mutable_attributes()
->mutable_refresh_policy()
->mutable_refresh_interval() = entity_common_config.refresh_interval();
*sensor_attributes_static.mutable_i2c_common_config() = i2c_common_config;
*sensor_attributes_static.mutable_entity_common_config() =
entity_common_config;
*sensor_attributes_static.mutable_thresholds() = threshold_configs;
*sensor_attributes_static.mutable_reading_ranges() = reading_range_configs;
*sensor_attributes_static.mutable_reading_transform() =
reading_transform_config;
*sensor_attributes_static.mutable_static_refresh_interval() =
entity_common_config.refresh_interval();
return sensor_attributes_static;
}
std::vector<std::shared_ptr<const SensorValue>>
Sensor::GetSensorDataHistorySince(absl::Time start_time) const
ABSL_LOCKS_EXCLUDED(sensor_data_mutex_) {
absl::MutexLock lock(&sensor_data_mutex_);
auto it = std::lower_bound(
sensor_data_.begin(), sensor_data_.end(), start_time,
[](const std::shared_ptr<const SensorValue>& sensor_data,
absl::Time start_time) {
return DecodeGoogleApiProto(sensor_data->timestamp()) < start_time;
});
if (it == sensor_data_.end()) {
return {};
}
return {it, sensor_data_.end()};
}
void Sensor::UpdateSensorSamplingInterval(
absl::Duration new_sampling_interval) {
// NOLINTNEXTLINE: Yocto's Abseil doesn't support non-pointer constructor.
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
*sensor_attributes_dynamic_.mutable_refresh_interval() =
EncodeGoogleApiProto(new_sampling_interval);
}
// Subscribes to the sensor data.
absl::Status Sensor::Subscribe(const SubscriptionParams* subscription_params) {
ECCLESIA_ASSIGN_OR_RETURN(int batch_size,
subscription_params->GetBatchSizeForSubscription());
uint64_t max_queue_size =
sensor_attributes_static_.entity_common_config().queue_size();
if (batch_size > max_queue_size) {
return absl::InvalidArgumentError(absl::Substitute(
"Max queue size $0 is smaller than the requested batch size $1",
max_queue_size, batch_size));
}
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
subscription_params_.insert(subscription_params);
// If the new batch size is smallest, we need to reset the current batch size
// used for the notification.
if (sensor_attributes_dynamic_.data_points_to_notify() == 0 ||
*subscription_params_.begin() == subscription_params) {
sensor_attributes_dynamic_.set_data_points_to_notify(batch_size);
// Set the refresh interval for the sensor based on the subscription
// parameters.
*sensor_attributes_dynamic_.mutable_refresh_interval() =
GetDurationFromNanoseconds(subscription_params->sample_interval_ns);
// Reset the current batch size. This is needed in case the new batch size
// is smaller than the current one.
sensor_attributes_dynamic_.set_data_points_buffered(0);
}
return absl::OkStatus();
}
// Deletes an existing subscription.
absl::Status Sensor::Unsubscribe(
const SubscriptionParams* subscription_params) {
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
if (subscription_params_.empty()) {
return absl::NotFoundError("No subscriptions to unsubscribe from.");
}
subscription_params_.erase(subscription_params);
if (subscription_params_.empty()) {
sensor_attributes_dynamic_.set_data_points_to_notify(0);
return absl::OkStatus();
}
ECCLESIA_ASSIGN_OR_RETURN(
int batch_size,
(*subscription_params_.begin())->GetBatchSizeForSubscription());
sensor_attributes_dynamic_.set_data_points_to_notify(batch_size);
return absl::OkStatus();
}
void Sensor::StoreSensorData(
const std::shared_ptr<const SensorValue>& sensor_data) {
if (notification_cb_ == std::nullopt) {
int data_points_buffered = 0;
{
// NOLINTNEXTLINE: Yocto's Abseil is old and only supports pointer type
absl::MutexLock data_lock(&sensor_data_mutex_);
sensor_data_.push_back(sensor_data);
data_points_buffered = static_cast<int>(sensor_data_.size());
{
// NOLINTNEXTLINE: Yocto's Abseil is old and only supports pointer type
absl::MutexLock attributes_lock(&sensor_attributes_dynamic_mutex_);
sensor_attributes_dynamic_.
set_data_points_buffered(data_points_buffered);
}
}
return;
}
int data_points_to_notify = 0;
bool should_notify = false;
{
absl::MutexLock sensor_attributes_dynamic_lock(
&sensor_attributes_dynamic_mutex_);
int data_points_buffered =
sensor_attributes_dynamic_.data_points_buffered() + 1;
data_points_to_notify = sensor_attributes_dynamic_.data_points_to_notify();
should_notify = notification_cb_ != std::nullopt &&
data_points_to_notify == data_points_buffered;
// If we should notify, reset the data points buffered
should_notify ? sensor_attributes_dynamic_.set_data_points_buffered(0)
: sensor_attributes_dynamic_.set_data_points_buffered(
data_points_buffered);
}
std::vector<std::shared_ptr<const SensorValue>> sensor_batch;
{
absl::MutexLock lock(&sensor_data_mutex_);
sensor_data_.push_back(sensor_data);
if (!should_notify) {
return;
}
// Get the last batch of data from the buffer.
auto it = sensor_data_.end() - static_cast<int>(data_points_to_notify);
// Prepare the batch to be sent to the callback.
sensor_batch.reserve(data_points_to_notify);
std::copy(it, sensor_data_.end(), std::back_inserter(sensor_batch));
}
(*notification_cb_)(std::move(sensor_batch));
}
void Sensor::UpdateState(State&& state) {
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
*sensor_attributes_dynamic_.mutable_state() = std::move(state);
}
void Sensor::ResizeBuffer(size_t buffer_size) {
absl::MutexLock lock(&sensor_data_mutex_);
LOG(WARNING) << "Resize buffer to " << buffer_size;
sensor_data_.set_capacity(buffer_size);
}
void Sensor::ResetMetrics() {
// NOLINTNEXTLINE: Yocto's Abseil doesn't support non-pointer constructor.
absl::MutexLock lock(&sensor_metrics_mutex_);
total_hardware_latency_ms_ = 0;
total_hardware_read_cnt_ = 0;
total_software_polling_start_interval_ms_ = 0;
total_software_polling_start_cnt_ = 0;
total_software_polling_end_interval_ms_ = 0;
total_software_polling_end_cnt_ = 0;
last_refresh_start_time_ = std::chrono::steady_clock::time_point::min();
last_refresh_end_time_ = std::chrono::steady_clock::time_point::min();
}
SensorMetrics Sensor::GetSensorMetrics() const {
// NOLINTNEXTLINE: Yocto's Abseil doesn't support non-pointer constructor.
absl::MutexLock lock(&sensor_metrics_mutex_);
SensorMetrics metrics;
if (total_hardware_read_cnt_ == 0 || total_software_polling_start_cnt_ == 0 ||
total_software_polling_end_cnt_ == 0) {
metrics.mutable_average_hardware_polling_latency()->set_nanos(0);
metrics.mutable_average_software_polling_start_interval()->set_nanos(0);
metrics.mutable_average_software_polling_end_interval()->set_nanos(0);
return metrics;
}
uint64_t average_hardware_polling_latency_ms =
total_hardware_latency_ms_ / total_hardware_read_cnt_;
*metrics.mutable_average_hardware_polling_latency() =
GetDurationFromMilliseconds(average_hardware_polling_latency_ms);
uint64_t average_software_polling_start_interval_ms =
total_software_polling_start_interval_ms_ /
total_software_polling_start_cnt_;
*metrics.mutable_average_software_polling_start_interval() =
GetDurationFromMilliseconds(average_software_polling_start_interval_ms);
uint64_t average_software_polling_end_interval_ms =
total_software_polling_end_interval_ms_ / total_software_polling_end_cnt_;
*metrics.mutable_average_software_polling_end_interval() =
GetDurationFromMilliseconds(average_software_polling_end_interval_ms);
return metrics;
}
void Sensor::UpdateHardwarePollingMetrics(
std::chrono::steady_clock::duration latency) {
// NOLINTNEXTLINE: Yocto's Abseil doesn't support non-pointer constructor.
absl::MutexLock lock(&sensor_metrics_mutex_);
// At least increase the latency by 1ms to avoid total latency being zero.
if (latency < std::chrono::milliseconds(1)) {
latency = std::chrono::milliseconds(1);
}
total_hardware_latency_ms_ +=
static_cast<uint64_t>(latency.count() / 1000000);
total_hardware_read_cnt_++;
}
void Sensor::UpdateSoftwarePollingStartMetrics(
std::chrono::steady_clock::duration interval) {
// NOLINTNEXTLINE: Yocto's Abseil doesn't support non-pointer constructor.
absl::MutexLock lock(&sensor_metrics_mutex_);
// At least increase the interval by 1ms to avoid total interval being zero.
if (interval < std::chrono::milliseconds(1)) {
interval = std::chrono::milliseconds(1);
}
total_software_polling_start_interval_ms_ +=
static_cast<uint64_t>(interval.count() / 1000000);
total_software_polling_start_cnt_++;
}
void Sensor::UpdateSoftwarePollingEndMetrics(
std::chrono::steady_clock::duration interval) {
// NOLINTNEXTLINE: Yocto's Abseil doesn't support non-pointer constructor.
absl::MutexLock lock(&sensor_metrics_mutex_);
// At least increase the interval by 1ms to avoid total interval being zero.
if (interval < std::chrono::milliseconds(1)) {
interval = std::chrono::milliseconds(1);
}
total_software_polling_end_interval_ms_ +=
static_cast<uint64_t>(interval.count() / 1000000);
total_software_polling_end_cnt_++;
}
} // namespace milotic_tlbmc