| // Copyright 2025 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| //! BMC Telemetry Service implementation voyager telemetry gRPC. |
| //! |
| //! This module provides a gRPC-based telemetry service for Baseboard Management Controllers (BMCs). |
| //! It handles subscription requests, processes telemetry data, and streams updates to clients. |
| |
| use crate::app_state::AppState; |
| use futures::StreamExt; |
| use std::collections::HashMap; |
| use std::pin::Pin; |
| use std::sync::Arc; |
| use std::time::Duration; |
| use tokio::sync::mpsc; |
| use tokio_stream::wrappers::ReceiverStream; |
| use tokio_stream::Stream; |
| use tonic::{Code, Request, Response, Status}; |
| |
| use crate::grpc::third_party_voyager::{ |
| machine_telemetry_server::MachineTelemetry, request_fqp, DataPoint, FqpType, |
| Request as TelemetryRequest, RequestFqp, SetRequest, TypedStruct, TypedValue, Update, |
| }; |
| use crate::handlers::xpath::get_xpath_urls; |
| use crate::telemetry_source_manager::telemetry_source_manager::{SubscriptionType, TypedTelemetry}; |
| use crate::telemetry_source_manager::telemetry_source_manager_api::handle_subscribe_inner; |
| |
| use super::third_party_voyager; |
| |
| use crate::grpc::server_config::voyager_server_config::ServerConfig as ProtobufServerConfig; |
| use prost::Message; |
| use protobuf::text_format::parse_from_str; |
| use protobuf::Message as ProtobufMessage; |
| use third_party_voyager::ServerConfig as ProstServerConfig; |
| use third_party_voyager::{typed_value, Threshold, Thresholds}; |
| |
| /// Represents the BMC Telemetry Service. |
| pub struct BmcTelemetryService { |
| /// Shared application state. |
| pub state: Arc<AppState>, |
| /// Server configuration, shared and protected by a read-write lock. |
| pub server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>, |
| } |
| |
| /// Loads the server configuration from a textproto file. |
| /// |
| /// # Arguments |
| /// |
| /// * `file_path` - The path to the configuration file in textproto format, which contains a |
| /// ServerConfig message defined in voyager_server_config proto file. |
| /// |
| /// # Returns |
| /// |
| /// Returns a `Result` containing the parsed `ServerConfig` or an error. |
| pub fn load_server_config( |
| file_path: &str, |
| ) -> Result<ProstServerConfig, Box<dyn std::error::Error>> { |
| let content = std::fs::read_to_string(file_path)?; |
| |
| // Use protobuf creat to parse a textproto string to protobuf crate format ServerConfig object.. |
| let config: ProtobufServerConfig = parse_from_str(&content)?; |
| |
| // Encode the protobuf crate format ServerConfig object into binary format |
| let mut binary_data = Vec::new(); |
| config |
| .write_to_vec(&mut binary_data) |
| .expect("Failed to encode ProtobufServerConfig object to binary"); |
| |
| // Use this trick to convert protobuf crate format message to prost crate format message |
| let config = ProstServerConfig::decode(&*binary_data) |
| .expect("Failed to decode binary data into ProstServerConfig"); |
| println!("Loaded ServerConfig: {:#?}", config); |
| |
| Ok(config) |
| } |
| |
| /// Retrieves the double value from a `TypedValue`. |
| /// |
| /// # Arguments |
| /// |
| /// * `typed_value` - The `TypedValue` to extract the double from. |
| /// |
| /// # Returns |
| /// |
| /// An `Option<f64>` containing the double value if present. |
| fn get_double_value(typed_value: &TypedValue) -> Option<f64> { |
| typed_value.value.as_ref().and_then(|value| { |
| if let typed_value::Value::DoubleVal(v) = value { |
| Some(*v) |
| } else { |
| None |
| } |
| }) |
| } |
| |
| /// Determines the appropriate threshold configuration based on a reading. |
| /// |
| /// # Arguments |
| /// |
| /// * `reading` - The current sensor reading. |
| /// * `config` - The threshold configuration. |
| /// |
| /// # Returns |
| /// |
| /// An `Option` containing a reference to the appropriate `Threshold`, if any. |
| pub fn get_threshold_config(reading: f64, config: &Thresholds) -> Option<&Threshold> { |
| let thresholds = &config.threshold; |
| |
| if thresholds.is_empty() { |
| return None; |
| } |
| |
| let mut current_index = 0; |
| |
| while current_index < thresholds.len() { |
| // SAFETY: already checked for out-of-range access |
| let threshold = &thresholds[current_index]; |
| let cross_above = threshold |
| .cross_above_value |
| .as_ref() |
| .and_then(get_double_value); |
| let cross_below = threshold |
| .cross_below_value |
| .as_ref() |
| .and_then(get_double_value); |
| |
| match (cross_above, cross_below) { |
| (Some(above), Some(below)) => { |
| if reading > above && current_index < thresholds.len() - 1 { |
| current_index += 1; |
| } else if reading < below && current_index > 0 { |
| current_index -= 1; |
| } else { |
| return Some(threshold); |
| } |
| } |
| (Some(above), None) => { |
| if reading > above && current_index < thresholds.len() - 1 { |
| current_index += 1; |
| } else { |
| return Some(threshold); |
| } |
| } |
| (None, Some(below)) => { |
| if reading < below && current_index > 0 { |
| current_index -= 1; |
| } else { |
| return Some(threshold); |
| } |
| } |
| (None, None) => return Some(threshold), |
| } |
| } |
| |
| // If we've gone through all thresholds, return the last one |
| thresholds.last() |
| } |
| |
| /// Creates a `DataPoint` from an `TypedTelemetry<f64>`. |
| /// |
| /// # Arguments |
| /// |
| /// * `event` - The `TypedTelemetry<f64>` to convert. |
| /// * `predecessor` - The predecessor of 'event' in same batch. |
| /// |
| /// # Returns |
| /// |
| /// A `Result` containing the created `DataPoint` or an error. |
| fn create_sensor_data_point( |
| event: &TypedTelemetry<f64>, |
| predecessor: Option<&TypedTelemetry<f64>>, |
| ) -> Result<DataPoint, anyhow::Error> { |
| let timestamp_ns = event |
| .timestamp |
| .duration_since(std::time::UNIX_EPOCH) |
| .unwrap_or_default() |
| .as_nanos() as u64; |
| |
| let mut fields = HashMap::new(); |
| let mut changed = false; |
| |
| if predecessor.map_or(true, |p| p.source_name != event.source_name) { |
| fields.insert( |
| "@odata.id".to_string(), |
| TypedValue { |
| value: Some(typed_value::Value::StringVal(event.source_name.clone())), |
| }, |
| ); |
| changed = true; |
| } |
| |
| if predecessor.map_or(true, |p| p.value != event.value) { |
| fields.insert( |
| "SensorValue".to_string(), |
| TypedValue { |
| value: Some(typed_value::Value::DoubleVal(event.value)), |
| }, |
| ); |
| changed = true; |
| } |
| |
| if changed || predecessor.is_none() { |
| fields.insert( |
| "Status.Health".to_string(), |
| TypedValue { |
| value: Some(typed_value::Value::StringVal("OK".to_owned())), |
| }, |
| ); |
| } |
| |
| Ok(DataPoint { |
| timestamp_ns, |
| data: Some(third_party_voyager::data_point::Data::KeyValue( |
| TypedStruct { fields }, |
| )), |
| ..Default::default() |
| }) |
| } |
| |
| /// Creates and manages a stream of telemetry updates. |
| /// |
| /// # Arguments |
| /// |
| /// * `state` - The shared application state. |
| /// * `req_id` - The request ID. |
| /// * `subscription_type` - The subscription type parameters. |
| /// * `urls` - The Redfish URLs of telemetry events. |
| /// * `server_config` - The server configuration. |
| /// * `tx` - The channel sender for updates. |
| /// |
| /// # Returns |
| /// |
| /// A `Result` indicating success or containing a `Status` error. |
| async fn create_response_stream( |
| state: Arc<AppState>, |
| req_id: String, |
| subscription_type: SubscriptionType, |
| urls: Vec<String>, |
| server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>, |
| tx: mpsc::Sender<Result<Update, Status>>, |
| ) -> Result<(), Status> { |
| let mut rx_stream = |
| handle_subscribe_inner(state.clone(), subscription_type, urls, Some(server_config)).await; |
| |
| while let Some(batch_events) = rx_stream.next().await { |
| let mut current_update: Option<Update> = None; |
| let mut predecessor: Option<TypedTelemetry<f64>> = None; |
| for event in batch_events { |
| let data_point = |
| create_sensor_data_point(&event, predecessor.as_ref()).map_err(|e| { |
| Status::internal(format!("Failed to create sensor data point: {}", e)) |
| })?; |
| |
| if let Some(update) = &mut current_update { |
| update.data_points.push(data_point); |
| } else { |
| current_update = Some(Update { |
| req_id: req_id.clone(), |
| data_points: vec![data_point], |
| ..Default::default() |
| }); |
| } |
| predecessor = Some(event); |
| } |
| |
| if let Some(update) = current_update.take() { |
| if let Err(e) = tx.send(Ok(update)).await { |
| eprintln!("Error sending update: {:?}", e); |
| return Err(Status::internal("Failed to send update")); |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Helper function, creates subscription parameters from a `RequestFqp`. |
| /// |
| /// # Arguments |
| /// |
| /// * `req_fqp` - The `RequestFqp` to create parameters from. |
| /// |
| /// # Returns |
| /// |
| /// A `SubscriptionType` struct containing the subscription parameters on success or Status on |
| /// error. |
| fn get_subscription_type(req_fqp: &RequestFqp) -> Result<SubscriptionType, Status> { |
| let subscription_type = match req_fqp.mode { |
| 1 => SubscriptionType::OnChange, |
| 2 => SubscriptionType::Periodical( |
| Duration::from_nanos(req_fqp.sample_frequency_expect_ns), |
| Duration::from_nanos(req_fqp.export_frequency_ns), |
| ), |
| 3 => SubscriptionType::Periodical( |
| // TODO: get default polling parameters from server_config |
| Duration::from_secs(1), |
| Duration::from_secs(10), |
| ), |
| _ => return Err(Status::invalid_argument("Unsupported sampling mode")), |
| }; |
| Ok(subscription_type) |
| } |
| |
| /// Retrieves selected FQPs and their associated thresholds from the server configuration. |
| /// |
| /// # Arguments |
| /// |
| /// * `server_config` - The server configuration. |
| /// * `req_config_group` - The requested configuration group name. |
| /// |
| /// # Returns |
| /// |
| /// A vector of tuples containing the selected FQPs and their thresholds. |
| async fn get_selected_fqps( |
| server_config: &Arc<tokio::sync::RwLock<ProstServerConfig>>, |
| req_config_group: &str, |
| ) -> Vec<(String, Option<Threshold>)> { |
| let server_config = server_config.read().await; |
| let mut result = Vec::new(); |
| |
| // Step 1: Get the ConfigGroup from the top-level map |
| if let Some(config_group) = server_config.cfg_groups.get(req_config_group) { |
| // Step 2: Iterate through req_fqp_names in the ConfigGroup |
| for req_fqp_name in &config_group.req_fqp_names { |
| // Step 3: Get the ReqFqpConfig from the second-level map |
| if let Some(req_fqp_config) = server_config.req_fqp_configs.get(req_fqp_name) { |
| // Step 4: Iterate through RequestFqp in ReqFqpConfig |
| for req_fqp in &req_fqp_config.req_fqp { |
| if let Some(fqp) = &req_fqp.fqp { |
| let specifier = fqp.specifier.clone(); |
| |
| // Step 5: Get the matching Threshold from the threshold_config map |
| let threshold = |
| if let Some(request_fqp::Config::ThresholdConfig(threshold_config)) = |
| &req_fqp.config |
| { |
| server_config |
| .threshold_config |
| .get(&req_fqp.req_fqp_name) |
| .and_then(|thresholds| { |
| thresholds |
| .threshold |
| .iter() |
| .find(|t| t.name == *threshold_config) |
| .cloned() |
| }) |
| } else { |
| None |
| }; |
| |
| result.push((specifier, threshold)); |
| } |
| } |
| } |
| } |
| } |
| |
| result |
| } |
| |
| /// Handles requests for a specific configuration group. |
| /// |
| /// # Arguments |
| /// |
| /// * `state` - The shared application state. |
| /// * `server_config` - The server configuration. |
| /// * `req` - The telemetry request. |
| /// * `tx` - The channel sender for updates. |
| /// |
| /// # Returns |
| /// |
| /// A `Result` indicating success or containing a `Status` error. |
| async fn handle_config_group( |
| state: Arc<AppState>, |
| server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>, |
| req: TelemetryRequest, |
| tx: mpsc::Sender<Result<Update, Status>>, |
| ) -> Result<(), Status> { |
| let selected = get_selected_fqps(&server_config, &req.req_config_group).await; |
| println!( |
| "handle_config_group {} selected Fqp and Theshold: {:#?}", |
| &req.req_config_group, selected |
| ); |
| let urls = selected.iter().map(|(string, _)| string.clone()).collect(); |
| let subscription_type = SubscriptionType::Periodical( |
| // TODO: get default polling parameters from server_config |
| Duration::from_secs(1), |
| Duration::from_secs(10), |
| ); |
| create_response_stream( |
| state, |
| req.req_id, |
| subscription_type, |
| urls, |
| server_config, |
| tx, |
| ) |
| .await |
| } |
| |
| /// Handles a single FQP (Fully Qualified Path) request. |
| /// |
| /// # Arguments |
| /// |
| /// * `state` - The shared application state. |
| /// * `server_config` - The server configuration. |
| /// * `req_fqp` - The FQP request. |
| /// * `req_id` - The request ID. |
| /// * `tx` - The channel sender for updates. |
| /// |
| /// # Returns |
| /// |
| /// A `Result` indicating success or containing a `Status` error. |
| async fn handle_fqp( |
| state: Arc<AppState>, |
| server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>, |
| req_fqp: &RequestFqp, |
| req_id: String, |
| tx: mpsc::Sender<Result<Update, Status>>, |
| ) -> Result<(), Status> { |
| let fqp = match &req_fqp.fqp { |
| Some(fqp) => fqp, |
| None => return Err(Status::invalid_argument("Address not specified")), |
| }; |
| let identifiers: HashMap<String, Vec<String>> = fqp |
| .identifiers |
| .iter() |
| .map(|(k, v)| (k.clone(), vec![v.clone()])) |
| .collect(); |
| |
| // TODO: need adapt to proto encoded filters |
| |
| let segments: Vec<&str> = fqp.specifier.split('/').collect(); |
| let urls = match get_xpath_urls(&state, &identifiers, &segments).await { |
| Ok(urls) => urls, |
| Err(_) => return Err(Status::internal("Failed to get XPath URLs")), |
| }; |
| |
| let subscription_type = get_subscription_type(req_fqp)?; |
| create_response_stream(state, req_id, subscription_type, urls, server_config, tx).await |
| } |
| |
| /// Handles FqpType::RedfishResource request, when Fqp.specifier represents an odata.type, |
| /// and the Fqp.specifier is used to select all sensors when it's "*Sensor". |
| /// |
| /// # Arguments |
| /// |
| /// * `state` - The shared application state. |
| /// * `server_config` - The server configuration. |
| /// * `req_fqp` - The FQP request. |
| /// * `req_id` - The request ID. |
| /// * `tx` - The channel sender for updates. |
| /// |
| /// # Returns |
| /// |
| /// A `Result` indicating success or containing a `Status` error. |
| async fn handle_sensor_request( |
| state: Arc<AppState>, |
| server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>, |
| req_fqp: &RequestFqp, |
| req_id: String, |
| tx: mpsc::Sender<Result<Update, Status>>, |
| ) -> Result<(), Status> { |
| // Use fixed identifiers |
| let identifiers: HashMap<String, Vec<String>> = HashMap::from([ |
| ("ChassisId".to_string(), vec!["*".to_string()]), |
| ("SensorId".to_string(), vec!["*".to_string()]), |
| ]); |
| let segments: Vec<&str> = "/redfish/v1/Chassis/{ChassisId}/Sensors/{SensorId}" |
| .split('/') |
| .collect(); |
| |
| let urls = match get_xpath_urls(&state, &identifiers, &segments).await { |
| Ok(urls) => urls, |
| Err(_) => return Err(Status::internal("Failed to get XPath URLs")), |
| }; |
| |
| let subscription_type = get_subscription_type(req_fqp)?; |
| create_response_stream(state, req_id, subscription_type, urls, server_config, tx).await |
| } |
| |
| #[tonic::async_trait] |
| impl MachineTelemetry for BmcTelemetryService { |
| type SubscribeV2Stream = ReceiverStream<Result<Update, Status>>; |
| type SubscribeStream = Pin<Box<dyn Stream<Item = Result<Update, Status>> + Send + 'static>>; |
| |
| /// Handles subscription requests and sets up a stream of telemetry updates. |
| /// |
| /// # Arguments |
| /// |
| /// * `request` - The incoming subscription request. |
| /// |
| /// # Returns |
| /// |
| /// A `Result` containing a `Response` with the subscription stream or a `Status` error. |
| async fn subscribe_v2( |
| &self, |
| request: Request<tonic::Streaming<TelemetryRequest>>, |
| ) -> Result<Response<Self::SubscribeV2Stream>, Status> { |
| let mut stream = request.into_inner(); |
| let state = self.state.clone(); |
| let server_config = self.server_config.clone(); |
| |
| let (tx, rx) = mpsc::channel(16); // Adjust channel size as needed |
| |
| tokio::spawn(async move { |
| while let Some(req) = match stream.message().await { |
| Ok(req) => req, |
| Err(e) => { |
| eprintln!("Error receiving stream message: {:?}", e); |
| return; |
| } |
| } { |
| if !req.req_config_group.is_empty() { |
| // Client subscribe by a server config name, like |
| // TelemetryRequest { |
| // req_id: "req_repairability".into(), |
| // req_config_group: "repairability_basic_cfg_group".into(), |
| // ..Default::default() |
| // }; |
| if let Err(e) = |
| handle_config_group(state.clone(), server_config.clone(), req, tx.clone()) |
| .await |
| { |
| eprintln!("Handler error: {:?}", e); |
| } |
| continue; |
| } |
| for req_fqp in &req.req_fqp { |
| if let Some(fqp) = &req_fqp.fqp { |
| println!("fqp: {:?}", fqp); |
| match FqpType::try_from(fqp.r#type) { |
| Ok(fqp_type) => { |
| match fqp_type { |
| FqpType::NotSet => { |
| // Client subscribe by a Fqp, with wildcard match to select |
| // all sensors on all chassis: |
| // Fqp { |
| // specifier: "/redfish/v1/Chassis/{ChassisId}/Sensors/{SensorId}".into(), |
| // identifiers: HashMap::from([ |
| // ("ChassisId".into(), "*".into()), |
| // ("SensorId".into(), "*".into()), |
| // ]), |
| // r#type: FqpType::NotSet as i32, |
| // ..Default::default() |
| // }; |
| if let Err(e) = handle_fqp( |
| state.clone(), |
| server_config.clone(), |
| req_fqp, |
| req.req_id.clone(), |
| tx.clone(), |
| ) |
| .await |
| { |
| eprintln!("Handler error: {:?}", e); |
| } |
| } |
| FqpType::RedfishResource => { |
| // Client subscribe by a Fqp, with Redfish odata.type to |
| // select all resouces has that type: |
| // Fqp { |
| // specifier: ""#Sensor.v1_2_0.Sensor"".into(), |
| // r#type: FqpType::RedfishResource as i32, |
| // ..Default::default() |
| // }; |
| if fqp.specifier.contains("Sensor") { |
| if let Err(e) = handle_sensor_request( |
| state.clone(), |
| server_config.clone(), |
| req_fqp, |
| req.req_id.clone(), |
| tx.clone(), |
| ) |
| .await |
| { |
| eprintln!("Handler error: {:?}", e); |
| } |
| } else { |
| eprintln!("Subscribe by RedfishResource for {} not implemented", fqp.specifier); |
| } |
| } |
| } |
| } |
| Err(e) => { |
| eprintln!("Invalid FqpType value: {}. Error: {:?}", fqp.r#type, e); |
| continue; |
| } |
| } |
| } |
| } |
| } |
| }); |
| |
| Ok(Response::new(ReceiverStream::new(rx))) |
| } |
| |
| async fn get(&self, _: Request<TelemetryRequest>) -> Result<Response<Update>, Status> { |
| Err(Status::new( |
| Code::Internal, |
| "gRPC method not implemented".to_string(), |
| )) |
| } |
| |
| async fn put(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> { |
| Err(Status::new( |
| Code::Internal, |
| "gRPC method not implemented".to_string(), |
| )) |
| } |
| |
| async fn post(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> { |
| Err(Status::new( |
| Code::Internal, |
| "gRPC method not implemented".to_string(), |
| )) |
| } |
| |
| async fn patch(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> { |
| Err(Status::new( |
| Code::Internal, |
| "gRPC method not implemented".to_string(), |
| )) |
| } |
| |
| async fn delete(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> { |
| Err(Status::new( |
| Code::Internal, |
| "gRPC method not implemented".to_string(), |
| )) |
| } |
| |
| async fn subscribe( |
| &self, |
| _: Request<TelemetryRequest>, |
| ) -> Result<Response<Self::SubscribeStream>, Status> { |
| Err(Status::new( |
| Code::Internal, |
| "gRPC method not implemented".to_string(), |
| )) |
| } |
| } |