| // Copyright 2025 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| //! This module provides functionality for creating and managing I2C sensors for the telemetry_source_manager. |
| #![allow(clippy::type_complexity)] |
| |
| use crate::telemetry_source_manager::sensor_configs::{ |
| find_sensor_file, FruInfo, PSUProperty, Sensor as SensorConfig, |
| }; |
| use crate::telemetry_source_manager::telemetry_source_manager::{AsyncReadValue, TelemetrySource}; |
| use anyhow::Result; |
| use std::collections::HashMap; |
| use std::str::FromStr; |
| use std::sync::Arc; |
| use std::thread; |
| use std::time::{Duration, Instant, SystemTime}; |
| use tokio::sync::mpsc; |
| use tokio::sync::Mutex; |
| use tokio_uring::fs::File as UringFile; |
| |
| const BUFFER_SIZE: usize = 128; |
| |
| /// Represents an I2C sensor reader that implements AsyncReadValue for the telemetry_source_manager. |
| pub struct I2cSensorReader { |
| // This mutex is only for making this struct to have Send + Sync, as required by trait AsyncReadValue |
| // There should be no real contender for this lock in data path, unless the get_value and read_values |
| // are called at same time. |
| value_receiver: Arc<Mutex<mpsc::Receiver<Vec<(f64, SystemTime)>>>>, |
| #[allow(dead_code)] |
| psu_property: PSUProperty, |
| sampling_interval: Arc<std::sync::atomic::AtomicU64>, |
| reporting_interval: Arc<std::sync::atomic::AtomicU64>, |
| } |
| |
| impl I2cSensorReader { |
| /// Creates a new I2cSensorReader instance. |
| /// |
| /// # Arguments |
| /// |
| /// * `sysfs_path` - The sysfs path for the sensor. |
| /// * `psu_property` - The PSU property associated with the sensor. |
| /// * `sampling_interval` - The interval at which to sample the sensor. |
| /// * `reporting_interval` - The interval at which to report sensor values. |
| /// |
| /// # Returns |
| /// |
| /// A Result containing the new I2cSensorReader instance or an error. |
| |
| pub fn new( |
| sysfs_path: String, |
| psu_property: PSUProperty, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Self> { |
| let (tx, rx) = mpsc::channel(16); |
| let value_receiver = Arc::new(Mutex::new(rx)); |
| let sampling_interval = Arc::new(std::sync::atomic::AtomicU64::new( |
| sampling_interval.as_millis() as u64, |
| )); |
| let reporting_interval = Arc::new(std::sync::atomic::AtomicU64::new( |
| reporting_interval.as_millis() as u64, |
| )); |
| |
| let sampling_interval_clone = sampling_interval.clone(); |
| let reporting_interval_clone = reporting_interval.clone(); |
| |
| thread::spawn(move || { |
| tokio_uring::start(async move { |
| let file = match UringFile::open(&sysfs_path).await { |
| Ok(f) => f, |
| Err(e) => { |
| eprintln!("Error opening file {}: {:?}", sysfs_path, e); |
| return; |
| } |
| }; |
| |
| let mut buf = vec![0u8; BUFFER_SIZE]; |
| let mut batch_values = Vec::new(); |
| let mut last_report_time = SystemTime::now(); |
| let factor = 10.0_f64.powi(psu_property.scale); |
| |
| loop { |
| let loop_start = Instant::now(); |
| |
| let (res, new_buf) = file.read_at(buf, 0).await; |
| buf = new_buf; // Reuse the buffer |
| |
| match res { |
| Ok(bytes_read) => { |
| if bytes_read == 0 { |
| continue; |
| } |
| |
| let s = std::str::from_utf8(&buf[..bytes_read]).unwrap_or_default(); |
| let raw_value = f64::from_str(s.trim()).unwrap_or_default(); |
| let final_value = (raw_value / factor) + psu_property.offset as f64; |
| |
| let now = SystemTime::now(); |
| batch_values.push((final_value, now)); |
| |
| if now |
| .duration_since(last_report_time) |
| .unwrap_or(Duration::ZERO) |
| >= Duration::from_millis( |
| reporting_interval_clone |
| .load(std::sync::atomic::Ordering::Relaxed), |
| ) |
| { |
| if tx.send(batch_values.clone()).await.is_err() { |
| // The receiver has been dropped, exit the thread |
| break; |
| } |
| batch_values.clear(); |
| last_report_time = now; |
| } |
| } |
| Err(e) => { |
| eprintln!("Error reading file {}: {:?}", sysfs_path, e); |
| } |
| } |
| |
| let elapsed = loop_start.elapsed(); |
| let intended_interval = Duration::from_millis( |
| sampling_interval_clone.load(std::sync::atomic::Ordering::Relaxed), |
| ); |
| |
| if elapsed < intended_interval { |
| let sleep_duration = intended_interval - elapsed; |
| tokio::time::sleep(sleep_duration).await; |
| } |
| } |
| }); |
| }); |
| |
| Ok(Self { |
| value_receiver, |
| psu_property, |
| sampling_interval, |
| reporting_interval, |
| }) |
| } |
| } |
| |
| #[async_trait::async_trait] |
| impl AsyncReadValue<f64> for I2cSensorReader { |
| /// Reads sensor values asynchronously. |
| /// |
| /// # Arguments |
| /// |
| /// * `sampling_interval` - The interval at which to sample the sensor. |
| /// * `reporting_interval` - The interval at which to report sensor values. |
| /// |
| /// # Returns |
| /// |
| /// A Result containing a vector of sensor values and timestamps, or an error. |
| async fn read_values( |
| &self, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Vec<(f64, SystemTime)>, std::io::Error> { |
| self.sampling_interval.store( |
| sampling_interval.as_millis() as u64, |
| std::sync::atomic::Ordering::Relaxed, |
| ); |
| self.reporting_interval.store( |
| reporting_interval.as_millis() as u64, |
| std::sync::atomic::Ordering::Relaxed, |
| ); |
| let mut value_receiver = self.value_receiver.lock().await; |
| match value_receiver.recv().await { |
| Some(values) => Ok(values), |
| None => Err(std::io::Error::new( |
| std::io::ErrorKind::Other, |
| "io_uring thread has terminated", |
| )), |
| } |
| } |
| |
| /// Read current value and return it with timestamp. |
| /// |
| /// # Returns |
| /// |
| /// A value with its corresponding timestamp. |
| async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> { |
| let mut value_receiver = self.value_receiver.lock().await; |
| match value_receiver.recv().await { |
| Some(values) => values.last().cloned().ok_or_else(|| { |
| std::io::Error::new(std::io::ErrorKind::Other, "No values received") |
| }), |
| None => Err(std::io::Error::new( |
| std::io::ErrorKind::Other, |
| "io_uring thread has terminated", |
| )), |
| } |
| } |
| } |
| |
| /// Creates an I2C sensor for telemetry_source_manager from the provided configuration. |
| /// |
| /// # Arguments |
| /// |
| /// * `sensor_config` - The sensor configuration. |
| /// * `all_sensor_frus` - A HashMap containing FRU (Field Replaceable Unit) information for sensors. |
| /// * `sampling_interval` - The interval at which to sample the sensor (in milliseconds). |
| /// * `reporting_interval` - The interval at which to report sensor values (in milliseconds). |
| /// |
| /// # Returns |
| /// |
| /// A Result containing the created Sensor instance or an error. |
| pub async fn create_i2c_sensor( |
| sensor_config: &SensorConfig, |
| all_sensor_frus: &HashMap<String, FruInfo>, |
| sampling_interval: u64, |
| reporting_interval: u64, |
| ) -> Result<Box<TelemetrySource<f64>>, Box<dyn std::error::Error>> { |
| let sysfs_path = find_sensor_file(sensor_config)?; |
| let psu_property = sensor_config.reading_type.0.clone(); |
| let reader = Arc::new(I2cSensorReader::new( |
| sysfs_path, |
| psu_property, |
| Duration::from_millis(sampling_interval), |
| Duration::from_millis(reporting_interval), |
| )?); |
| |
| let mut secondary_keys = Vec::new(); |
| |
| // Add sensor's ReadingType as secondary key |
| secondary_keys.push(( |
| "ReadingType".to_string(), |
| sensor_config.reading_type.1.to_string(), |
| )); |
| |
| // Add sensor's type as secondary key |
| secondary_keys.push(("Type".to_string(), "I2C".to_string())); |
| |
| // Add sensor's FruInfo if available |
| if let Some(fru_info) = all_sensor_frus.get(&sensor_config.Name) { |
| if let Some(location_code) = &fru_info.location_code { |
| secondary_keys.push(("LocationCode".to_string(), location_code.clone())); |
| } |
| if let Some(service_label) = &fru_info.service_label { |
| secondary_keys.push(("ServiceLabel".to_string(), service_label.clone())); |
| } |
| } |
| |
| let sensor = TelemetrySource::new( |
| sensor_config.Name.clone(), |
| Some(secondary_keys), |
| reader, |
| Duration::from_millis(sampling_interval), |
| Duration::from_millis(1), |
| Duration::from_millis(reporting_interval), |
| ); |
| |
| Ok(sensor) |
| } |