| #include "tlbmc/sensors/sensor.h" |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <cstdint> |
| #include <iterator> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "google/protobuf/duration.pb.h" |
| #include "absl/base/thread_annotations.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/strings/substitute.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "g3/macros.h" |
| #include "entity_common_config.pb.h" |
| #include "i2c_common_config.pb.h" |
| #include "reading_range_config.pb.h" |
| #include "reading_transform_config.pb.h" |
| #include "tlbmc/configs/subscription_config.h" |
| #include "threshold_config.pb.h" |
| #include "resource.pb.h" |
| #include "sensor.pb.h" |
| #include "tlbmc/time/time.h" |
| |
| namespace milotic_tlbmc { |
| |
| namespace { |
| |
| google::protobuf::Duration GetDurationFromNanoseconds(uint64_t nanoseconds) { |
| google::protobuf::Duration duration; |
| duration.set_seconds(static_cast<int64_t>(nanoseconds / 1000000000)); |
| duration.set_nanos(static_cast<int32_t>(nanoseconds % 1000000000)); |
| return duration; |
| } |
| |
| } // namespace |
| |
| std::string GetTrimmedSensorName(absl::string_view sensor_name) { |
| std::string stripped_sensor_name; |
| if (uint64_t under_pos = sensor_name.find('_'); |
| under_pos != std::string::npos) { |
| stripped_sensor_name = sensor_name.substr(under_pos + 1); |
| } |
| std::replace(stripped_sensor_name.begin(), stripped_sensor_name.end(), '_', |
| ' '); |
| return stripped_sensor_name; |
| } |
| |
| SensorAttributesStatic Sensor::CreateStaticAttributes( |
| absl::string_view sensor_name, const SensorUnit &sensor_unit, |
| const I2cCommonConfig &i2c_common_config, |
| const EntityCommonConfig &entity_common_config, |
| const ThresholdConfigs &threshold_configs, |
| const ReadingRangeConfigs &reading_range_configs, |
| const ReadingTransformConfig &reading_transform_config) { |
| SensorAttributesStatic sensor_attributes_static; |
| sensor_attributes_static.set_unit(sensor_unit); |
| sensor_attributes_static.mutable_attributes()->set_key(sensor_name); |
| sensor_attributes_static.mutable_attributes() |
| ->mutable_refresh_policy() |
| ->set_refresh_mode(REFRESH_MODE_PERIODIC); |
| *sensor_attributes_static.mutable_attributes() |
| ->mutable_refresh_policy() |
| ->mutable_refresh_interval() = entity_common_config.refresh_interval(); |
| *sensor_attributes_static.mutable_i2c_common_config() = i2c_common_config; |
| *sensor_attributes_static.mutable_entity_common_config() = |
| entity_common_config; |
| *sensor_attributes_static.mutable_thresholds() = threshold_configs; |
| *sensor_attributes_static.mutable_reading_ranges() = reading_range_configs; |
| *sensor_attributes_static.mutable_reading_transform() = |
| reading_transform_config; |
| *sensor_attributes_static.mutable_static_refresh_interval() = |
| entity_common_config.refresh_interval(); |
| return sensor_attributes_static; |
| } |
| |
| std::vector<std::shared_ptr<const SensorValue>> |
| Sensor::GetSensorDataHistorySince(absl::Time start_time) const |
| ABSL_LOCKS_EXCLUDED(sensor_data_mutex_) { |
| absl::MutexLock lock(&sensor_data_mutex_); |
| auto it = std::lower_bound( |
| sensor_data_.begin(), sensor_data_.end(), start_time, |
| [](const std::shared_ptr<const SensorValue> &sensor_data, |
| absl::Time start_time) { |
| return DecodeGoogleApiProto(sensor_data->timestamp()) < start_time; |
| }); |
| if (it == sensor_data_.end()) { |
| return {}; |
| } |
| return {it, sensor_data_.end()}; |
| } |
| |
| // Subscribes to the sensor data. |
| absl::Status Sensor::Subscribe(const SubscriptionParams *subscription_params) { |
| ECCLESIA_ASSIGN_OR_RETURN(int batch_size, |
| subscription_params->GetBatchSizeForSubscription()); |
| |
| uint64_t max_queue_size = |
| sensor_attributes_static_.entity_common_config().queue_size(); |
| if (batch_size > max_queue_size) { |
| return absl::InvalidArgumentError(absl::Substitute( |
| "Max queue size $0 is smaller than the requested batch size $1", |
| max_queue_size, batch_size)); |
| } |
| |
| absl::MutexLock lock(&sensor_attributes_dynamic_mutex_); |
| subscription_params_.insert(subscription_params); |
| |
| // If the new batch size is smallest, we need to reset the current batch size |
| // used for the notification. |
| if (sensor_attributes_dynamic_.data_points_to_notify() == 0 || |
| *subscription_params_.begin() == subscription_params) { |
| sensor_attributes_dynamic_.set_data_points_to_notify(batch_size); |
| |
| // Set the refresh interval for the sensor based on the subscription |
| // parameters. |
| *sensor_attributes_dynamic_.mutable_refresh_interval() = |
| GetDurationFromNanoseconds(subscription_params->sample_interval_ns); |
| |
| // Reset the current batch size. This is needed in case the new batch size |
| // is smaller than the current one. |
| sensor_attributes_dynamic_.set_data_points_buffered(0); |
| } |
| return absl::OkStatus(); |
| } |
| |
| // Deletes an existing subscription. |
| absl::Status Sensor::Unsubscribe( |
| const SubscriptionParams *subscription_params) { |
| absl::MutexLock lock(&sensor_attributes_dynamic_mutex_); |
| if (subscription_params_.empty()) { |
| return absl::NotFoundError("No subscriptions to unsubscribe from."); |
| } |
| subscription_params_.erase(subscription_params); |
| if (subscription_params_.empty()) { |
| sensor_attributes_dynamic_.set_data_points_to_notify(0); |
| return absl::OkStatus(); |
| } |
| |
| ECCLESIA_ASSIGN_OR_RETURN( |
| int batch_size, |
| (*subscription_params_.begin())->GetBatchSizeForSubscription()); |
| sensor_attributes_dynamic_.set_data_points_to_notify(batch_size); |
| return absl::OkStatus(); |
| } |
| |
| void Sensor::StoreSensorData( |
| const std::shared_ptr<const SensorValue> &sensor_data) { |
| if (notification_cb_ == std::nullopt) { |
| int data_points_buffered = 0; |
| { |
| absl::MutexLock lock(&sensor_data_mutex_); |
| sensor_data_.push_back(sensor_data); |
| data_points_buffered = static_cast<int>(sensor_data_.size()); |
| } |
| { |
| absl::MutexLock lock(&sensor_attributes_dynamic_mutex_); |
| sensor_attributes_dynamic_.set_data_points_buffered(data_points_buffered); |
| } |
| return; |
| } |
| int data_points_to_notify = 0; |
| bool should_notify = false; |
| { |
| absl::MutexLock sensor_attributes_dynamic_lock( |
| &sensor_attributes_dynamic_mutex_); |
| int data_points_buffered = |
| sensor_attributes_dynamic_.data_points_buffered() + 1; |
| data_points_to_notify = sensor_attributes_dynamic_.data_points_to_notify(); |
| should_notify = notification_cb_ != std::nullopt && |
| data_points_to_notify == data_points_buffered; |
| // If we should notify, reset the data points buffered |
| should_notify ? sensor_attributes_dynamic_.set_data_points_buffered(0) |
| : sensor_attributes_dynamic_.set_data_points_buffered( |
| data_points_buffered); |
| } |
| |
| std::vector<std::shared_ptr<const SensorValue>> sensor_batch; |
| { |
| absl::MutexLock lock(&sensor_data_mutex_); |
| sensor_data_.push_back(sensor_data); |
| if (!should_notify) { |
| return; |
| } |
| // Get the last batch of data from the buffer. |
| auto it = sensor_data_.end() - static_cast<int>(data_points_to_notify); |
| // Prepare the batch to be sent to the callback. |
| sensor_batch.reserve(data_points_to_notify); |
| std::copy(it, sensor_data_.end(), std::back_inserter(sensor_batch)); |
| } |
| (*notification_cb_)(std::move(sensor_batch)); |
| } |
| |
| void Sensor::UpdateState(State &&state) { |
| absl::MutexLock lock(&sensor_attributes_dynamic_mutex_); |
| *sensor_attributes_dynamic_.mutable_state() = std::move(state); |
| } |
| |
| void Sensor::ResizeBuffer(size_t buffer_size) { |
| absl::MutexLock lock(&sensor_data_mutex_); |
| LOG(WARNING) << "Resize buffer to " << buffer_size; |
| sensor_data_.set_capacity(buffer_size); |
| } |
| |
| } // namespace milotic_tlbmc |