blob: f5c844fdc69601fa1e5f6e92f901ec5b81eded49 [file] [log] [blame]
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! This module provides functionality for creating and managing a sensor database,
//! handling sensor events, and subscribing to sensor updates in an OpenBMC environment.
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::app_state::AppState;
use crate::grpc::telemetry_server::get_threshold_config;
use crate::grpc::third_party_voyager::ServerConfig as ProstServerConfig;
use crate::grpc::third_party_voyager::Thresholds;
use crate::telemetry_source_manager::dbus_sensors::dbus_sensors::create_dbus_sensor;
use crate::telemetry_source_manager::i2c_sensors::create_i2c_sensor;
use crate::telemetry_source_manager::sensor_configs::{extract_sensor_frus, extract_sensors};
use crate::telemetry_source_manager::telemetry_source_manager::{
SubscriptionType, Telemetry, TelemetrySourceManager, TypedTelemetry,
};
use log::{error, info, warn};
use redfish_codegen::models::odata_v4;
/// Creates a telemetry_source_manager by reading sensor configurations from Entity Manager config files.
///
/// # Arguments
///
/// * `sampling_interval` - The default sampling interval for sensors.
/// * `reporting_interval` - The default reporting interval for sensors.
/// * `_server_config` - An optional server configuration.
///
/// # Returns
///
/// An `Arc<TelemetrySourceManager>` if successful, or an error if the operation fails.
pub async fn create_telemetry_source_manager(
sampling_interval: u64,
reporting_interval: u64,
_server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>,
em_configs: Vec<String>,
) -> Result<Arc<TelemetrySourceManager>, Box<dyn std::error::Error>> {
let telemetry_source_manager = TelemetrySourceManager::new();
// Extract all real sensors from Entity Manager config files
let mut all_sensors = Vec::new();
for file_path in &em_configs {
let path = std::path::PathBuf::from(file_path);
if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("json") {
all_sensors.extend(extract_sensors(path.to_str().unwrap_or_default()).await?);
} else {
eprintln!("Skipping non-JSON file: {:?}", path);
}
}
// Extract all sensor's FRU info from EM's config files
let mut all_sensor_frus = HashMap::new();
for file_path in &em_configs {
let path = std::path::PathBuf::from(file_path);
if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("json") {
all_sensor_frus.extend(extract_sensor_frus(path.to_str().unwrap_or_default())?);
}
}
// Create TelemetrySource objects from parsed configurations, add them to the telemetry_source_manager.
for sensor_config in all_sensors {
if let (Some(_bus), Some(_address)) =
(sensor_config.Bus.as_ref(), sensor_config.Address.as_ref())
{
#[allow(unused_mut)]
#[allow(unused_assignments)]
let mut add_this_sensor = true;
#[cfg(feature = "load-sources-from-server-config")]
{
let config_map = _server_config.read().await;
add_this_sensor = config_map
.threshold_config
.keys()
.any(|key| key.contains(&sensor_config.Name));
}
if add_this_sensor {
if let Ok(sensor) = create_i2c_sensor(
&sensor_config,
&all_sensor_frus,
sampling_interval,
reporting_interval,
)
.await
{
println!("Adding I2C sensor {}", &sensor_config.Name);
telemetry_source_manager.add_source::<f64>(sensor).await?;
} else {
warn!("no sysfs_path found for sensor {:?}", sensor_config);
}
} else {
println!(
"Skip I2C sensor {} that is not in server_config",
&sensor_config.Name
);
}
} else if sensor_config.sensor_type == "PLDM" {
if let Ok(sensor) = create_dbus_sensor(
&sensor_config,
&all_sensor_frus,
sampling_interval,
reporting_interval,
)
.await
{
info!("Adding PLDM sensor {}", sensor_config.Name);
telemetry_source_manager.add_source::<f64>(sensor).await?;
}
}
}
Ok(telemetry_source_manager)
}
/// Extracts the sensor name from a Redfish sensor ID.
///
/// # Arguments
///
/// * `sensor_name` - The Redfish sensor ID.
///
/// # Returns
///
/// A `String` containing the extracted sensor name, or an error if the extraction fails.
fn get_sensor_name(sensor_name: &odata_v4::Id) -> Result<String, Box<dyn std::error::Error>> {
let sensor_name = sensor_name.0.split('/').last().unwrap_or_default();
let parts: Vec<&str> = sensor_name.split('_').collect();
if parts.is_empty() {
return Err(anyhow::anyhow!("Invalid Redfish sensor name pattern").into());
}
// FIXME! make "fanpwm_fan1_pwm" or "fan_pwm_fan1_pwm" to "/xyz/openbmc_project/sensors/fan_pwm/fan1_pwm"
if parts[0].starts_with("fan") {
// Handle fan sensor names
if parts.len() < 2 {
return Err(anyhow::anyhow!("Invalid fan sensor name pattern").into());
}
// Take the last two segments
Ok(parts[parts.len() - 2..].join("_"))
} else {
// Handle non-fan sensor names
if parts.len() < 2 {
return Err(anyhow::anyhow!("Invalid sensor name pattern").into());
}
Ok(parts[1..].join("_"))
}
}
/// Send sensor values to gRPC subscriber.
///
/// # Arguments
///
/// * `sensor_name_id` - The Redfish sensor ID.
/// * `telemetries` - The telemetries from subscription to telemetry_source_manager.
/// * `tx` - The transmit half of a channel for sending `TypedTelemetry<f64>` updates.
///
/// # Returns
///
/// `Ok(())` if successful, or an error if the operation fails.
async fn send_sensor_values(
sensor_name_id: &odata_v4::Id,
telemetries: Vec<Box<dyn Telemetry + Send>>,
tx: &tokio::sync::mpsc::Sender<Vec<TypedTelemetry<f64>>>,
) -> Result<()> {
let event_responses: Vec<TypedTelemetry<f64>> = telemetries
.into_iter()
.filter_map(|telemetry| {
telemetry
.as_any()
.downcast_ref::<TypedTelemetry<f64>>()
.map(|tt| {
let mut new_tt = tt.clone();
new_tt.source_name.clone_from(&sensor_name_id.0);
new_tt
})
})
.collect();
if !event_responses.is_empty() {
tx.send(event_responses)
.await
.context("Failed to send event responses")?;
}
Ok(())
}
/// Polls a single sensor and sends updates through a channel.
///
/// # Arguments
///
/// * `sensor_name_id` - The Redfish odata_id of the sensor.
/// * `subscription_type` - The subscription parameters.
/// * `event_source_config` - The Thresholds associated with this sensor.
/// * `tx` - The transmit half of a channel for sending `TypedTelemetry<f64>` updates.
/// * `telemetry_source_manager` - The `TelemetrySourceManager` containing sensor information.
///
/// # Returns
///
/// `Ok(())` if successful, or an error if the operation fails.
async fn poll_one_sensor(
sensor_name_id: &odata_v4::Id,
subscription_type: SubscriptionType,
event_source_config: Option<Thresholds>,
tx: tokio::sync::mpsc::Sender<Vec<TypedTelemetry<f64>>>,
telemetry_source_manager: Arc<TelemetrySourceManager>,
) -> Result<(), Box<dyn std::error::Error>> {
let sensor_name = get_sensor_name(sensor_name_id)?;
let sensors = telemetry_source_manager.query_sources_by_names(&[&sensor_name])?;
if sensors.len() != 1 {
error!("query_sources_by_names failed for {sensor_name}");
return Err(anyhow::anyhow!("Invalid query_sources_by_names result").into());
}
match subscription_type {
SubscriptionType::Periodical(sampling_interval, reporting_interval) => {
let mut current_sample_rate = sampling_interval;
let mut current_reporting_rate = reporting_interval;
if let Some(config) = &event_source_config {
// Threshold mode
loop {
#[cfg(debug_assertions)]
println!(
"Sensor {} interval_duration {:?}, reporting_duration {:?}",
sensor_name, current_sample_rate, current_reporting_rate
);
let mut subscription = telemetry_source_manager.subscribe_telemetries::<f64>(
&sensor_name,
SubscriptionType::Periodical(current_sample_rate, current_reporting_rate),
)?;
while let Some(telemetries) = subscription.telemetry_receiver.recv().await {
let last_value = telemetries
.last()
.and_then(|t| t.as_any().downcast_ref::<TypedTelemetry<f64>>())
.map(|t| t.value);
send_sensor_values(sensor_name_id, telemetries, &tx).await?;
if let Some(last_value) = last_value {
let threshold_config = get_threshold_config(last_value, config);
if let Some(threshold_config) = threshold_config {
let new_sampling_interval =
if threshold_config.sample_frequency_expect_ns > 0 {
Duration::from_nanos(
threshold_config.sample_frequency_expect_ns,
)
} else {
Duration::from_secs(1)
};
let new_reporting_interval =
if threshold_config.export_frequency_ns > 0 {
Duration::from_nanos(threshold_config.export_frequency_ns)
} else {
Duration::from_secs(10)
};
if new_sampling_interval != current_sample_rate {
#[cfg(debug_assertions)]
println!(
"Sensor {} sample rate changed from {:?} to {:?}",
sensor_name, current_sample_rate, new_sampling_interval
);
current_sample_rate = new_sampling_interval;
current_reporting_rate = new_reporting_interval;
break; // This will exit the inner while loop and create a new subscription
}
}
}
}
}
} else {
// Normal Periodic mode
let mut subscription = telemetry_source_manager.subscribe_telemetries::<f64>(
&sensor_name,
SubscriptionType::Periodical(sampling_interval, reporting_interval),
)?;
while let Some(telemetries) = subscription.telemetry_receiver.recv().await {
send_sensor_values(sensor_name_id, telemetries, &tx).await?;
}
}
}
SubscriptionType::OnChange => {
let mut subscription = telemetry_source_manager
.subscribe_telemetries::<f64>(&sensor_name, SubscriptionType::OnChange)?;
while let Some(telemetries) = subscription.telemetry_receiver.recv().await {
send_sensor_values(sensor_name_id, telemetries, &tx).await?;
}
}
// TODO: Add support for other SubscriptionType
#[allow(unreachable_patterns)]
_ => {
return Err(anyhow::anyhow!("Must give a subscribe type").into());
}
}
Ok(())
}
/// Handles sensor subscription requests and returns a stream of batch `TypedTelemetry` objects.
///
/// # Arguments
///
/// * `state` - The application state.
/// * `subscription_type` - The subscription parameters.
/// * `urls` - The Redfish URLs to identify sensors.
/// * `server_config` - An optional server configuration.
///
/// # Returns
///
/// A `ReceiverStream` of `Vec<TypedTelemetry<f64>>` objects.
pub async fn handle_subscribe_inner(
state: Arc<AppState>,
subscription_type: SubscriptionType,
urls: Vec<String>,
server_config: Option<Arc<tokio::sync::RwLock<ProstServerConfig>>>,
) -> tokio_stream::wrappers::ReceiverStream<Vec<TypedTelemetry<f64>>> {
let (tx, rx) = tokio::sync::mpsc::channel(16);
for url in urls {
// Extract sensor name from odata_id
let sensor_name = url.split('/').last().unwrap_or_default().to_string();
let sensor_name_id = odata_v4::Id(url.to_owned());
let config = if let Some(ref config_map) = server_config {
let config_map = config_map.read().await;
config_map.threshold_config.get(&sensor_name).cloned()
} else {
None
};
let tx_clone = tx.clone();
let telemetry_source_manager = state.telemetry_source_manager.clone();
let subscription_type_clone = subscription_type.clone();
tokio::spawn(async move {
let e = poll_one_sensor(
&sensor_name_id,
subscription_type_clone,
config,
tx_clone,
telemetry_source_manager,
)
.await;
println!("{:?}", e);
});
}
tokio_stream::wrappers::ReceiverStream::new(rx)
}