| // Copyright 2025 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| //! This module provides functionality for creating and managing a sensor database, |
| //! handling sensor events, and subscribing to sensor updates in an OpenBMC environment. |
| |
| use anyhow::{Context, Result}; |
| use std::collections::HashMap; |
| use std::sync::Arc; |
| use std::time::Duration; |
| |
| use crate::app_state::AppState; |
| use crate::grpc::telemetry_server::get_threshold_config; |
| use crate::grpc::third_party_voyager::ServerConfig as ProstServerConfig; |
| use crate::grpc::third_party_voyager::Thresholds; |
| use crate::telemetry_source_manager::dbus_sensors::dbus_sensors::create_dbus_sensor; |
| use crate::telemetry_source_manager::i2c_sensors::create_i2c_sensor; |
| use crate::telemetry_source_manager::sensor_configs::{extract_sensor_frus, extract_sensors}; |
| use crate::telemetry_source_manager::telemetry_source_manager::{ |
| SubscriptionType, Telemetry, TelemetrySourceManager, TypedTelemetry, |
| }; |
| use log::{error, info, warn}; |
| use redfish_codegen::models::odata_v4; |
| |
| /// Creates a telemetry_source_manager by reading sensor configurations from Entity Manager config files. |
| /// |
| /// # Arguments |
| /// |
| /// * `sampling_interval` - The default sampling interval for sensors. |
| /// * `reporting_interval` - The default reporting interval for sensors. |
| /// * `_server_config` - An optional server configuration. |
| /// |
| /// # Returns |
| /// |
| /// An `Arc<TelemetrySourceManager>` if successful, or an error if the operation fails. |
| pub async fn create_telemetry_source_manager( |
| sampling_interval: u64, |
| reporting_interval: u64, |
| _server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>, |
| em_configs: Vec<String>, |
| ) -> Result<Arc<TelemetrySourceManager>, Box<dyn std::error::Error>> { |
| let telemetry_source_manager = TelemetrySourceManager::new(); |
| |
| // Extract all real sensors from Entity Manager config files |
| |
| let mut all_sensors = Vec::new(); |
| for file_path in &em_configs { |
| let path = std::path::PathBuf::from(file_path); |
| if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("json") { |
| all_sensors.extend(extract_sensors(path.to_str().unwrap_or_default()).await?); |
| } else { |
| eprintln!("Skipping non-JSON file: {:?}", path); |
| } |
| } |
| |
| // Extract all sensor's FRU info from EM's config files |
| let mut all_sensor_frus = HashMap::new(); |
| for file_path in &em_configs { |
| let path = std::path::PathBuf::from(file_path); |
| if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("json") { |
| all_sensor_frus.extend(extract_sensor_frus(path.to_str().unwrap_or_default())?); |
| } |
| } |
| |
| // Create TelemetrySource objects from parsed configurations, add them to the telemetry_source_manager. |
| for sensor_config in all_sensors { |
| if let (Some(_bus), Some(_address)) = |
| (sensor_config.Bus.as_ref(), sensor_config.Address.as_ref()) |
| { |
| #[allow(unused_mut)] |
| #[allow(unused_assignments)] |
| let mut add_this_sensor = true; |
| #[cfg(feature = "load-sources-from-server-config")] |
| { |
| let config_map = _server_config.read().await; |
| add_this_sensor = config_map |
| .threshold_config |
| .keys() |
| .any(|key| key.contains(&sensor_config.Name)); |
| } |
| |
| if add_this_sensor { |
| if let Ok(sensor) = create_i2c_sensor( |
| &sensor_config, |
| &all_sensor_frus, |
| sampling_interval, |
| reporting_interval, |
| ) |
| .await |
| { |
| println!("Adding I2C sensor {}", &sensor_config.Name); |
| telemetry_source_manager.add_source::<f64>(sensor).await?; |
| } else { |
| warn!("no sysfs_path found for sensor {:?}", sensor_config); |
| } |
| } else { |
| println!( |
| "Skip I2C sensor {} that is not in server_config", |
| &sensor_config.Name |
| ); |
| } |
| } else if sensor_config.sensor_type == "PLDM" { |
| if let Ok(sensor) = create_dbus_sensor( |
| &sensor_config, |
| &all_sensor_frus, |
| sampling_interval, |
| reporting_interval, |
| ) |
| .await |
| { |
| info!("Adding PLDM sensor {}", sensor_config.Name); |
| telemetry_source_manager.add_source::<f64>(sensor).await?; |
| } |
| } |
| } |
| |
| Ok(telemetry_source_manager) |
| } |
| |
| /// Extracts the sensor name from a Redfish sensor ID. |
| /// |
| /// # Arguments |
| /// |
| /// * `sensor_name` - The Redfish sensor ID. |
| /// |
| /// # Returns |
| /// |
| /// A `String` containing the extracted sensor name, or an error if the extraction fails. |
| fn get_sensor_name(sensor_name: &odata_v4::Id) -> Result<String, Box<dyn std::error::Error>> { |
| let sensor_name = sensor_name.0.split('/').last().unwrap_or_default(); |
| let parts: Vec<&str> = sensor_name.split('_').collect(); |
| |
| if parts.is_empty() { |
| return Err(anyhow::anyhow!("Invalid Redfish sensor name pattern").into()); |
| } |
| |
| // FIXME! make "fanpwm_fan1_pwm" or "fan_pwm_fan1_pwm" to "/xyz/openbmc_project/sensors/fan_pwm/fan1_pwm" |
| if parts[0].starts_with("fan") { |
| // Handle fan sensor names |
| if parts.len() < 2 { |
| return Err(anyhow::anyhow!("Invalid fan sensor name pattern").into()); |
| } |
| // Take the last two segments |
| Ok(parts[parts.len() - 2..].join("_")) |
| } else { |
| // Handle non-fan sensor names |
| if parts.len() < 2 { |
| return Err(anyhow::anyhow!("Invalid sensor name pattern").into()); |
| } |
| Ok(parts[1..].join("_")) |
| } |
| } |
| |
| /// Send sensor values to gRPC subscriber. |
| /// |
| /// # Arguments |
| /// |
| /// * `sensor_name_id` - The Redfish sensor ID. |
| /// * `telemetries` - The telemetries from subscription to telemetry_source_manager. |
| /// * `tx` - The transmit half of a channel for sending `TypedTelemetry<f64>` updates. |
| /// |
| /// # Returns |
| /// |
| /// `Ok(())` if successful, or an error if the operation fails. |
| async fn send_sensor_values( |
| sensor_name_id: &odata_v4::Id, |
| telemetries: Vec<Box<dyn Telemetry + Send>>, |
| tx: &tokio::sync::mpsc::Sender<Vec<TypedTelemetry<f64>>>, |
| ) -> Result<()> { |
| let event_responses: Vec<TypedTelemetry<f64>> = telemetries |
| .into_iter() |
| .filter_map(|telemetry| { |
| telemetry |
| .as_any() |
| .downcast_ref::<TypedTelemetry<f64>>() |
| .map(|tt| { |
| let mut new_tt = tt.clone(); |
| new_tt.source_name.clone_from(&sensor_name_id.0); |
| new_tt |
| }) |
| }) |
| .collect(); |
| |
| if !event_responses.is_empty() { |
| tx.send(event_responses) |
| .await |
| .context("Failed to send event responses")?; |
| } |
| |
| Ok(()) |
| } |
| |
| /// Polls a single sensor and sends updates through a channel. |
| /// |
| /// # Arguments |
| /// |
| /// * `sensor_name_id` - The Redfish odata_id of the sensor. |
| /// * `subscription_type` - The subscription parameters. |
| /// * `event_source_config` - The Thresholds associated with this sensor. |
| /// * `tx` - The transmit half of a channel for sending `TypedTelemetry<f64>` updates. |
| /// * `telemetry_source_manager` - The `TelemetrySourceManager` containing sensor information. |
| /// |
| /// # Returns |
| /// |
| /// `Ok(())` if successful, or an error if the operation fails. |
| async fn poll_one_sensor( |
| sensor_name_id: &odata_v4::Id, |
| subscription_type: SubscriptionType, |
| event_source_config: Option<Thresholds>, |
| tx: tokio::sync::mpsc::Sender<Vec<TypedTelemetry<f64>>>, |
| telemetry_source_manager: Arc<TelemetrySourceManager>, |
| ) -> Result<(), Box<dyn std::error::Error>> { |
| let sensor_name = get_sensor_name(sensor_name_id)?; |
| let sensors = telemetry_source_manager.query_sources_by_names(&[&sensor_name])?; |
| if sensors.len() != 1 { |
| error!("query_sources_by_names failed for {sensor_name}"); |
| return Err(anyhow::anyhow!("Invalid query_sources_by_names result").into()); |
| } |
| match subscription_type { |
| SubscriptionType::Periodical(sampling_interval, reporting_interval) => { |
| let mut current_sample_rate = sampling_interval; |
| let mut current_reporting_rate = reporting_interval; |
| if let Some(config) = &event_source_config { |
| // Threshold mode |
| loop { |
| #[cfg(debug_assertions)] |
| println!( |
| "Sensor {} interval_duration {:?}, reporting_duration {:?}", |
| sensor_name, current_sample_rate, current_reporting_rate |
| ); |
| |
| let mut subscription = telemetry_source_manager.subscribe_telemetries::<f64>( |
| &sensor_name, |
| SubscriptionType::Periodical(current_sample_rate, current_reporting_rate), |
| )?; |
| |
| while let Some(telemetries) = subscription.telemetry_receiver.recv().await { |
| let last_value = telemetries |
| .last() |
| .and_then(|t| t.as_any().downcast_ref::<TypedTelemetry<f64>>()) |
| .map(|t| t.value); |
| |
| send_sensor_values(sensor_name_id, telemetries, &tx).await?; |
| |
| if let Some(last_value) = last_value { |
| let threshold_config = get_threshold_config(last_value, config); |
| if let Some(threshold_config) = threshold_config { |
| let new_sampling_interval = |
| if threshold_config.sample_frequency_expect_ns > 0 { |
| Duration::from_nanos( |
| threshold_config.sample_frequency_expect_ns, |
| ) |
| } else { |
| Duration::from_secs(1) |
| }; |
| let new_reporting_interval = |
| if threshold_config.export_frequency_ns > 0 { |
| Duration::from_nanos(threshold_config.export_frequency_ns) |
| } else { |
| Duration::from_secs(10) |
| }; |
| |
| if new_sampling_interval != current_sample_rate { |
| #[cfg(debug_assertions)] |
| println!( |
| "Sensor {} sample rate changed from {:?} to {:?}", |
| sensor_name, current_sample_rate, new_sampling_interval |
| ); |
| current_sample_rate = new_sampling_interval; |
| current_reporting_rate = new_reporting_interval; |
| break; // This will exit the inner while loop and create a new subscription |
| } |
| } |
| } |
| } |
| } |
| } else { |
| // Normal Periodic mode |
| let mut subscription = telemetry_source_manager.subscribe_telemetries::<f64>( |
| &sensor_name, |
| SubscriptionType::Periodical(sampling_interval, reporting_interval), |
| )?; |
| |
| while let Some(telemetries) = subscription.telemetry_receiver.recv().await { |
| send_sensor_values(sensor_name_id, telemetries, &tx).await?; |
| } |
| } |
| } |
| SubscriptionType::OnChange => { |
| let mut subscription = telemetry_source_manager |
| .subscribe_telemetries::<f64>(&sensor_name, SubscriptionType::OnChange)?; |
| |
| while let Some(telemetries) = subscription.telemetry_receiver.recv().await { |
| send_sensor_values(sensor_name_id, telemetries, &tx).await?; |
| } |
| } |
| // TODO: Add support for other SubscriptionType |
| #[allow(unreachable_patterns)] |
| _ => { |
| return Err(anyhow::anyhow!("Must give a subscribe type").into()); |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Handles sensor subscription requests and returns a stream of batch `TypedTelemetry` objects. |
| /// |
| /// # Arguments |
| /// |
| /// * `state` - The application state. |
| /// * `subscription_type` - The subscription parameters. |
| /// * `urls` - The Redfish URLs to identify sensors. |
| /// * `server_config` - An optional server configuration. |
| /// |
| /// # Returns |
| /// |
| /// A `ReceiverStream` of `Vec<TypedTelemetry<f64>>` objects. |
| pub async fn handle_subscribe_inner( |
| state: Arc<AppState>, |
| subscription_type: SubscriptionType, |
| urls: Vec<String>, |
| server_config: Option<Arc<tokio::sync::RwLock<ProstServerConfig>>>, |
| ) -> tokio_stream::wrappers::ReceiverStream<Vec<TypedTelemetry<f64>>> { |
| let (tx, rx) = tokio::sync::mpsc::channel(16); |
| for url in urls { |
| // Extract sensor name from odata_id |
| let sensor_name = url.split('/').last().unwrap_or_default().to_string(); |
| |
| let sensor_name_id = odata_v4::Id(url.to_owned()); |
| |
| let config = if let Some(ref config_map) = server_config { |
| let config_map = config_map.read().await; |
| config_map.threshold_config.get(&sensor_name).cloned() |
| } else { |
| None |
| }; |
| |
| let tx_clone = tx.clone(); |
| let telemetry_source_manager = state.telemetry_source_manager.clone(); |
| let subscription_type_clone = subscription_type.clone(); |
| tokio::spawn(async move { |
| let e = poll_one_sensor( |
| &sensor_name_id, |
| subscription_type_clone, |
| config, |
| tx_clone, |
| telemetry_source_manager, |
| ) |
| .await; |
| println!("{:?}", e); |
| }); |
| } |
| tokio_stream::wrappers::ReceiverStream::new(rx) |
| } |