blob: 63e750a1d6fbcf8373b6bb0f1d8a3a6520a6185c [file] [log] [blame]
#include "tlbmc/sensors/sensor.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "google/protobuf/duration.pb.h"
#include "absl/base/thread_annotations.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "g3/macros.h"
#include "entity_common_config.pb.h"
#include "i2c_common_config.pb.h"
#include "reading_range_config.pb.h"
#include "reading_transform_config.pb.h"
#include "tlbmc/configs/subscription_config.h"
#include "threshold_config.pb.h"
#include "resource.pb.h"
#include "sensor.pb.h"
#include "tlbmc/time/time.h"
namespace milotic_tlbmc {
namespace {
google::protobuf::Duration GetDurationFromNanoseconds(uint64_t nanoseconds) {
google::protobuf::Duration duration;
duration.set_seconds(static_cast<int64_t>(nanoseconds / 1000000000));
duration.set_nanos(static_cast<int32_t>(nanoseconds % 1000000000));
return duration;
}
} // namespace
std::string GetTrimmedSensorName(absl::string_view sensor_name) {
std::string stripped_sensor_name;
if (uint64_t under_pos = sensor_name.find('_');
under_pos != std::string::npos) {
stripped_sensor_name = sensor_name.substr(under_pos + 1);
}
std::replace(stripped_sensor_name.begin(), stripped_sensor_name.end(), '_',
' ');
return stripped_sensor_name;
}
SensorAttributesStatic Sensor::CreateStaticAttributes(
absl::string_view sensor_name, const SensorUnit &sensor_unit,
const I2cCommonConfig &i2c_common_config,
const EntityCommonConfig &entity_common_config,
const ThresholdConfigs &threshold_configs,
const ReadingRangeConfigs &reading_range_configs,
const ReadingTransformConfig &reading_transform_config) {
SensorAttributesStatic sensor_attributes_static;
sensor_attributes_static.set_unit(sensor_unit);
sensor_attributes_static.mutable_attributes()->set_key(sensor_name);
sensor_attributes_static.mutable_attributes()
->mutable_refresh_policy()
->set_refresh_mode(REFRESH_MODE_PERIODIC);
*sensor_attributes_static.mutable_attributes()
->mutable_refresh_policy()
->mutable_refresh_interval() = entity_common_config.refresh_interval();
*sensor_attributes_static.mutable_i2c_common_config() = i2c_common_config;
*sensor_attributes_static.mutable_entity_common_config() =
entity_common_config;
*sensor_attributes_static.mutable_thresholds() = threshold_configs;
*sensor_attributes_static.mutable_reading_ranges() = reading_range_configs;
*sensor_attributes_static.mutable_reading_transform() =
reading_transform_config;
*sensor_attributes_static.mutable_static_refresh_interval() =
entity_common_config.refresh_interval();
return sensor_attributes_static;
}
std::vector<std::shared_ptr<const SensorValue>>
Sensor::GetSensorDataHistorySince(absl::Time start_time) const
ABSL_LOCKS_EXCLUDED(sensor_data_mutex_) {
absl::MutexLock lock(&sensor_data_mutex_);
auto it = std::lower_bound(
sensor_data_.begin(), sensor_data_.end(), start_time,
[](const std::shared_ptr<const SensorValue> &sensor_data,
absl::Time start_time) {
return DecodeGoogleApiProto(sensor_data->timestamp()) < start_time;
});
if (it == sensor_data_.end()) {
return {};
}
return {it, sensor_data_.end()};
}
// Subscribes to the sensor data.
absl::Status Sensor::Subscribe(const SubscriptionParams *subscription_params) {
ECCLESIA_ASSIGN_OR_RETURN(int batch_size,
subscription_params->GetBatchSizeForSubscription());
uint64_t max_queue_size =
sensor_attributes_static_.entity_common_config().queue_size();
if (batch_size > max_queue_size) {
return absl::InvalidArgumentError(absl::Substitute(
"Max queue size $0 is smaller than the requested batch size $1",
max_queue_size, batch_size));
}
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
subscription_params_.insert(subscription_params);
// If the new batch size is smallest, we need to reset the current batch size
// used for the notification.
if (sensor_attributes_dynamic_.data_points_to_notify() == 0 ||
*subscription_params_.begin() == subscription_params) {
sensor_attributes_dynamic_.set_data_points_to_notify(batch_size);
// Set the refresh interval for the sensor based on the subscription
// parameters.
*sensor_attributes_dynamic_.mutable_refresh_interval() =
GetDurationFromNanoseconds(subscription_params->sample_interval_ns);
// Reset the current batch size. This is needed in case the new batch size
// is smaller than the current one.
sensor_attributes_dynamic_.set_data_points_buffered(0);
}
return absl::OkStatus();
}
// Deletes an existing subscription.
absl::Status Sensor::Unsubscribe(
const SubscriptionParams *subscription_params) {
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
if (subscription_params_.empty()) {
return absl::NotFoundError("No subscriptions to unsubscribe from.");
}
subscription_params_.erase(subscription_params);
if (subscription_params_.empty()) {
sensor_attributes_dynamic_.set_data_points_to_notify(0);
return absl::OkStatus();
}
ECCLESIA_ASSIGN_OR_RETURN(
int batch_size,
(*subscription_params_.begin())->GetBatchSizeForSubscription());
sensor_attributes_dynamic_.set_data_points_to_notify(batch_size);
return absl::OkStatus();
}
void Sensor::StoreSensorData(
const std::shared_ptr<const SensorValue> &sensor_data) {
if (notification_cb_ == std::nullopt) {
int data_points_buffered = 0;
{
absl::MutexLock lock(&sensor_data_mutex_);
sensor_data_.push_back(sensor_data);
data_points_buffered = static_cast<int>(sensor_data_.size());
}
{
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
sensor_attributes_dynamic_.set_data_points_buffered(data_points_buffered);
}
return;
}
int data_points_to_notify = 0;
bool should_notify = false;
{
absl::MutexLock sensor_attributes_dynamic_lock(
&sensor_attributes_dynamic_mutex_);
int data_points_buffered =
sensor_attributes_dynamic_.data_points_buffered() + 1;
data_points_to_notify = sensor_attributes_dynamic_.data_points_to_notify();
should_notify = notification_cb_ != std::nullopt &&
data_points_to_notify == data_points_buffered;
// If we should notify, reset the data points buffered
should_notify ? sensor_attributes_dynamic_.set_data_points_buffered(0)
: sensor_attributes_dynamic_.set_data_points_buffered(
data_points_buffered);
}
std::vector<std::shared_ptr<const SensorValue>> sensor_batch;
{
absl::MutexLock lock(&sensor_data_mutex_);
sensor_data_.push_back(sensor_data);
if (!should_notify) {
return;
}
// Get the last batch of data from the buffer.
auto it = sensor_data_.end() - static_cast<int>(data_points_to_notify);
// Prepare the batch to be sent to the callback.
sensor_batch.reserve(data_points_to_notify);
std::copy(it, sensor_data_.end(), std::back_inserter(sensor_batch));
}
(*notification_cb_)(std::move(sensor_batch));
}
void Sensor::UpdateState(State &&state) {
absl::MutexLock lock(&sensor_attributes_dynamic_mutex_);
*sensor_attributes_dynamic_.mutable_state() = std::move(state);
}
void Sensor::ResizeBuffer(size_t buffer_size) {
absl::MutexLock lock(&sensor_data_mutex_);
LOG(WARNING) << "Resize buffer to " << buffer_size;
sensor_data_.set_capacity(buffer_size);
}
} // namespace milotic_tlbmc