blob: 65a737033587d4768e65934ee9c905f7ec1c674c [file] [log] [blame]
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! This module provides functionality for creating and managing I2C sensors for the telemetry_source_manager.
#![allow(clippy::type_complexity)]
use crate::telemetry_source_manager::sensor_configs::{
find_sensor_file, FruInfo, PSUProperty, Sensor as SensorConfig,
};
use crate::telemetry_source_manager::telemetry_source_manager::{AsyncReadValue, TelemetrySource};
use anyhow::Result;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio_uring::fs::File as UringFile;
const BUFFER_SIZE: usize = 128;
/// Represents an I2C sensor reader that implements AsyncReadValue for the telemetry_source_manager.
pub struct I2cSensorReader {
// This mutex is only for making this struct to have Send + Sync, as required by trait AsyncReadValue
// There should be no real contender for this lock in data path, unless the get_value and read_values
// are called at same time.
value_receiver: Arc<Mutex<mpsc::Receiver<Vec<(f64, SystemTime)>>>>,
#[allow(dead_code)]
psu_property: PSUProperty,
sampling_interval: Arc<std::sync::atomic::AtomicU64>,
reporting_interval: Arc<std::sync::atomic::AtomicU64>,
}
impl I2cSensorReader {
/// Creates a new I2cSensorReader instance.
///
/// # Arguments
///
/// * `sysfs_path` - The sysfs path for the sensor.
/// * `psu_property` - The PSU property associated with the sensor.
/// * `sampling_interval` - The interval at which to sample the sensor.
/// * `reporting_interval` - The interval at which to report sensor values.
///
/// # Returns
///
/// A Result containing the new I2cSensorReader instance or an error.
pub fn new(
sysfs_path: String,
psu_property: PSUProperty,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Self> {
let (tx, rx) = mpsc::channel(16);
let value_receiver = Arc::new(Mutex::new(rx));
let sampling_interval = Arc::new(std::sync::atomic::AtomicU64::new(
sampling_interval.as_millis() as u64,
));
let reporting_interval = Arc::new(std::sync::atomic::AtomicU64::new(
reporting_interval.as_millis() as u64,
));
let sampling_interval_clone = sampling_interval.clone();
let reporting_interval_clone = reporting_interval.clone();
thread::spawn(move || {
tokio_uring::start(async move {
let file = match UringFile::open(&sysfs_path).await {
Ok(f) => f,
Err(e) => {
eprintln!("Error opening file {}: {:?}", sysfs_path, e);
return;
}
};
let mut buf = vec![0u8; BUFFER_SIZE];
let mut batch_values = Vec::new();
let mut last_report_time = SystemTime::now();
let factor = 10.0_f64.powi(psu_property.scale);
loop {
let loop_start = Instant::now();
let (res, new_buf) = file.read_at(buf, 0).await;
buf = new_buf; // Reuse the buffer
match res {
Ok(bytes_read) => {
if bytes_read == 0 {
continue;
}
let s = std::str::from_utf8(&buf[..bytes_read]).unwrap_or_default();
let raw_value = f64::from_str(s.trim()).unwrap_or_default();
let final_value = (raw_value / factor) + psu_property.offset as f64;
let now = SystemTime::now();
batch_values.push((final_value, now));
if now
.duration_since(last_report_time)
.unwrap_or(Duration::ZERO)
>= Duration::from_millis(
reporting_interval_clone
.load(std::sync::atomic::Ordering::Relaxed),
)
{
if tx.send(batch_values.clone()).await.is_err() {
// The receiver has been dropped, exit the thread
break;
}
batch_values.clear();
last_report_time = now;
}
}
Err(e) => {
eprintln!("Error reading file {}: {:?}", sysfs_path, e);
}
}
let elapsed = loop_start.elapsed();
let intended_interval = Duration::from_millis(
sampling_interval_clone.load(std::sync::atomic::Ordering::Relaxed),
);
if elapsed < intended_interval {
let sleep_duration = intended_interval - elapsed;
tokio::time::sleep(sleep_duration).await;
}
}
});
});
Ok(Self {
value_receiver,
psu_property,
sampling_interval,
reporting_interval,
})
}
}
#[async_trait::async_trait]
impl AsyncReadValue<f64> for I2cSensorReader {
/// Reads sensor values asynchronously.
///
/// # Arguments
///
/// * `sampling_interval` - The interval at which to sample the sensor.
/// * `reporting_interval` - The interval at which to report sensor values.
///
/// # Returns
///
/// A Result containing a vector of sensor values and timestamps, or an error.
async fn read_values(
&self,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Vec<(f64, SystemTime)>, std::io::Error> {
self.sampling_interval.store(
sampling_interval.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
self.reporting_interval.store(
reporting_interval.as_millis() as u64,
std::sync::atomic::Ordering::Relaxed,
);
let mut value_receiver = self.value_receiver.lock().await;
match value_receiver.recv().await {
Some(values) => Ok(values),
None => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"io_uring thread has terminated",
)),
}
}
/// Read current value and return it with timestamp.
///
/// # Returns
///
/// A value with its corresponding timestamp.
async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> {
let mut value_receiver = self.value_receiver.lock().await;
match value_receiver.recv().await {
Some(values) => values.last().cloned().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "No values received")
}),
None => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"io_uring thread has terminated",
)),
}
}
}
/// Creates an I2C sensor for telemetry_source_manager from the provided configuration.
///
/// # Arguments
///
/// * `sensor_config` - The sensor configuration.
/// * `all_sensor_frus` - A HashMap containing FRU (Field Replaceable Unit) information for sensors.
/// * `sampling_interval` - The interval at which to sample the sensor (in milliseconds).
/// * `reporting_interval` - The interval at which to report sensor values (in milliseconds).
///
/// # Returns
///
/// A Result containing the created Sensor instance or an error.
pub async fn create_i2c_sensor(
sensor_config: &SensorConfig,
all_sensor_frus: &HashMap<String, FruInfo>,
sampling_interval: u64,
reporting_interval: u64,
) -> Result<Box<TelemetrySource<f64>>, Box<dyn std::error::Error>> {
let sysfs_path = find_sensor_file(sensor_config)?;
let psu_property = sensor_config.reading_type.0.clone();
let reader = Arc::new(I2cSensorReader::new(
sysfs_path,
psu_property,
Duration::from_millis(sampling_interval),
Duration::from_millis(reporting_interval),
)?);
let mut secondary_keys = Vec::new();
// Add sensor's ReadingType as secondary key
secondary_keys.push((
"ReadingType".to_string(),
sensor_config.reading_type.1.to_string(),
));
// Add sensor's type as secondary key
secondary_keys.push(("Type".to_string(), "I2C".to_string()));
// Add sensor's FruInfo if available
if let Some(fru_info) = all_sensor_frus.get(&sensor_config.Name) {
if let Some(location_code) = &fru_info.location_code {
secondary_keys.push(("LocationCode".to_string(), location_code.clone()));
}
if let Some(service_label) = &fru_info.service_label {
secondary_keys.push(("ServiceLabel".to_string(), service_label.clone()));
}
}
let sensor = TelemetrySource::new(
sensor_config.Name.clone(),
Some(secondary_keys),
reader,
Duration::from_millis(sampling_interval),
Duration::from_millis(1),
Duration::from_millis(reporting_interval),
);
Ok(sensor)
}