blob: 5ee8c087fcf9046abbe2fb3ddbfcaa52c6c00406 [file] [log] [blame]
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! BMC Telemetry Service implementation voyager telemetry gRPC.
//!
//! This module provides a gRPC-based telemetry service for Baseboard Management Controllers (BMCs).
//! It handles subscription requests, processes telemetry data, and streams updates to clients.
use crate::app_state::AppState;
use futures::StreamExt;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tonic::{Code, Request, Response, Status};
use crate::grpc::third_party_voyager::{
machine_telemetry_server::MachineTelemetry, request_fqp, DataPoint, FqpType,
Request as TelemetryRequest, RequestFqp, SetRequest, TypedStruct, TypedValue, Update,
};
use crate::handlers::xpath::get_xpath_urls;
use crate::telemetry_source_manager::telemetry_source_manager::{SubscriptionType, TypedTelemetry};
use crate::telemetry_source_manager::telemetry_source_manager_api::handle_subscribe_inner;
use super::third_party_voyager;
use crate::grpc::server_config::voyager_server_config::ServerConfig as ProtobufServerConfig;
use prost::Message;
use protobuf::text_format::parse_from_str;
use protobuf::Message as ProtobufMessage;
use third_party_voyager::ServerConfig as ProstServerConfig;
use third_party_voyager::{typed_value, Threshold, Thresholds};
/// Represents the BMC Telemetry Service.
pub struct BmcTelemetryService {
/// Shared application state.
pub state: Arc<AppState>,
/// Server configuration, shared and protected by a read-write lock.
pub server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>,
}
/// Loads the server configuration from a textproto file.
///
/// # Arguments
///
/// * `file_path` - The path to the configuration file in textproto format, which contains a
/// ServerConfig message defined in voyager_server_config proto file.
///
/// # Returns
///
/// Returns a `Result` containing the parsed `ServerConfig` or an error.
pub fn load_server_config(
file_path: &str,
) -> Result<ProstServerConfig, Box<dyn std::error::Error>> {
let content = std::fs::read_to_string(file_path)?;
// Use protobuf creat to parse a textproto string to protobuf crate format ServerConfig object..
let config: ProtobufServerConfig = parse_from_str(&content)?;
// Encode the protobuf crate format ServerConfig object into binary format
let mut binary_data = Vec::new();
config
.write_to_vec(&mut binary_data)
.expect("Failed to encode ProtobufServerConfig object to binary");
// Use this trick to convert protobuf crate format message to prost crate format message
let config = ProstServerConfig::decode(&*binary_data)
.expect("Failed to decode binary data into ProstServerConfig");
println!("Loaded ServerConfig: {:#?}", config);
Ok(config)
}
/// Retrieves the double value from a `TypedValue`.
///
/// # Arguments
///
/// * `typed_value` - The `TypedValue` to extract the double from.
///
/// # Returns
///
/// An `Option<f64>` containing the double value if present.
fn get_double_value(typed_value: &TypedValue) -> Option<f64> {
typed_value.value.as_ref().and_then(|value| {
if let typed_value::Value::DoubleVal(v) = value {
Some(*v)
} else {
None
}
})
}
/// Determines the appropriate threshold configuration based on a reading.
///
/// # Arguments
///
/// * `reading` - The current sensor reading.
/// * `config` - The threshold configuration.
///
/// # Returns
///
/// An `Option` containing a reference to the appropriate `Threshold`, if any.
pub fn get_threshold_config(reading: f64, config: &Thresholds) -> Option<&Threshold> {
let thresholds = &config.threshold;
if thresholds.is_empty() {
return None;
}
let mut current_index = 0;
while current_index < thresholds.len() {
// SAFETY: already checked for out-of-range access
let threshold = &thresholds[current_index];
let cross_above = threshold
.cross_above_value
.as_ref()
.and_then(get_double_value);
let cross_below = threshold
.cross_below_value
.as_ref()
.and_then(get_double_value);
match (cross_above, cross_below) {
(Some(above), Some(below)) => {
if reading > above && current_index < thresholds.len() - 1 {
current_index += 1;
} else if reading < below && current_index > 0 {
current_index -= 1;
} else {
return Some(threshold);
}
}
(Some(above), None) => {
if reading > above && current_index < thresholds.len() - 1 {
current_index += 1;
} else {
return Some(threshold);
}
}
(None, Some(below)) => {
if reading < below && current_index > 0 {
current_index -= 1;
} else {
return Some(threshold);
}
}
(None, None) => return Some(threshold),
}
}
// If we've gone through all thresholds, return the last one
thresholds.last()
}
/// Creates a `DataPoint` from an `TypedTelemetry<f64>`.
///
/// # Arguments
///
/// * `event` - The `TypedTelemetry<f64>` to convert.
/// * `predecessor` - The predecessor of 'event' in same batch.
///
/// # Returns
///
/// A `Result` containing the created `DataPoint` or an error.
fn create_sensor_data_point(
event: &TypedTelemetry<f64>,
predecessor: Option<&TypedTelemetry<f64>>,
) -> Result<DataPoint, anyhow::Error> {
let timestamp_ns = event
.timestamp
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let mut fields = HashMap::new();
let mut changed = false;
if predecessor.map_or(true, |p| p.source_name != event.source_name) {
fields.insert(
"@odata.id".to_string(),
TypedValue {
value: Some(typed_value::Value::StringVal(event.source_name.clone())),
},
);
changed = true;
}
if predecessor.map_or(true, |p| p.value != event.value) {
fields.insert(
"SensorValue".to_string(),
TypedValue {
value: Some(typed_value::Value::DoubleVal(event.value)),
},
);
changed = true;
}
if changed || predecessor.is_none() {
fields.insert(
"Status.Health".to_string(),
TypedValue {
value: Some(typed_value::Value::StringVal("OK".to_owned())),
},
);
}
Ok(DataPoint {
timestamp_ns,
data: Some(third_party_voyager::data_point::Data::KeyValue(
TypedStruct { fields },
)),
..Default::default()
})
}
/// Creates and manages a stream of telemetry updates.
///
/// # Arguments
///
/// * `state` - The shared application state.
/// * `req_id` - The request ID.
/// * `subscription_type` - The subscription type parameters.
/// * `urls` - The Redfish URLs of telemetry events.
/// * `server_config` - The server configuration.
/// * `tx` - The channel sender for updates.
///
/// # Returns
///
/// A `Result` indicating success or containing a `Status` error.
async fn create_response_stream(
state: Arc<AppState>,
req_id: String,
subscription_type: SubscriptionType,
urls: Vec<String>,
server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>,
tx: mpsc::Sender<Result<Update, Status>>,
) -> Result<(), Status> {
let mut rx_stream =
handle_subscribe_inner(state.clone(), subscription_type, urls, Some(server_config)).await;
while let Some(batch_events) = rx_stream.next().await {
let mut current_update: Option<Update> = None;
let mut predecessor: Option<TypedTelemetry<f64>> = None;
for event in batch_events {
let data_point =
create_sensor_data_point(&event, predecessor.as_ref()).map_err(|e| {
Status::internal(format!("Failed to create sensor data point: {}", e))
})?;
if let Some(update) = &mut current_update {
update.data_points.push(data_point);
} else {
current_update = Some(Update {
req_id: req_id.clone(),
data_points: vec![data_point],
..Default::default()
});
}
predecessor = Some(event);
}
if let Some(update) = current_update.take() {
if let Err(e) = tx.send(Ok(update)).await {
eprintln!("Error sending update: {:?}", e);
return Err(Status::internal("Failed to send update"));
}
}
}
Ok(())
}
/// Helper function, creates subscription parameters from a `RequestFqp`.
///
/// # Arguments
///
/// * `req_fqp` - The `RequestFqp` to create parameters from.
///
/// # Returns
///
/// A `SubscriptionType` struct containing the subscription parameters on success or Status on
/// error.
fn get_subscription_type(req_fqp: &RequestFqp) -> Result<SubscriptionType, Status> {
let subscription_type = match req_fqp.mode {
1 => SubscriptionType::OnChange,
2 => SubscriptionType::Periodical(
Duration::from_nanos(req_fqp.sample_frequency_expect_ns),
Duration::from_nanos(req_fqp.export_frequency_ns),
),
3 => SubscriptionType::Periodical(
// TODO: get default polling parameters from server_config
Duration::from_secs(1),
Duration::from_secs(10),
),
_ => return Err(Status::invalid_argument("Unsupported sampling mode")),
};
Ok(subscription_type)
}
/// Retrieves selected FQPs and their associated thresholds from the server configuration.
///
/// # Arguments
///
/// * `server_config` - The server configuration.
/// * `req_config_group` - The requested configuration group name.
///
/// # Returns
///
/// A vector of tuples containing the selected FQPs and their thresholds.
async fn get_selected_fqps(
server_config: &Arc<tokio::sync::RwLock<ProstServerConfig>>,
req_config_group: &str,
) -> Vec<(String, Option<Threshold>)> {
let server_config = server_config.read().await;
let mut result = Vec::new();
// Step 1: Get the ConfigGroup from the top-level map
if let Some(config_group) = server_config.cfg_groups.get(req_config_group) {
// Step 2: Iterate through req_fqp_names in the ConfigGroup
for req_fqp_name in &config_group.req_fqp_names {
// Step 3: Get the ReqFqpConfig from the second-level map
if let Some(req_fqp_config) = server_config.req_fqp_configs.get(req_fqp_name) {
// Step 4: Iterate through RequestFqp in ReqFqpConfig
for req_fqp in &req_fqp_config.req_fqp {
if let Some(fqp) = &req_fqp.fqp {
let specifier = fqp.specifier.clone();
// Step 5: Get the matching Threshold from the threshold_config map
let threshold =
if let Some(request_fqp::Config::ThresholdConfig(threshold_config)) =
&req_fqp.config
{
server_config
.threshold_config
.get(&req_fqp.req_fqp_name)
.and_then(|thresholds| {
thresholds
.threshold
.iter()
.find(|t| t.name == *threshold_config)
.cloned()
})
} else {
None
};
result.push((specifier, threshold));
}
}
}
}
}
result
}
/// Handles requests for a specific configuration group.
///
/// # Arguments
///
/// * `state` - The shared application state.
/// * `server_config` - The server configuration.
/// * `req` - The telemetry request.
/// * `tx` - The channel sender for updates.
///
/// # Returns
///
/// A `Result` indicating success or containing a `Status` error.
async fn handle_config_group(
state: Arc<AppState>,
server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>,
req: TelemetryRequest,
tx: mpsc::Sender<Result<Update, Status>>,
) -> Result<(), Status> {
let selected = get_selected_fqps(&server_config, &req.req_config_group).await;
println!(
"handle_config_group {} selected Fqp and Theshold: {:#?}",
&req.req_config_group, selected
);
let urls = selected.iter().map(|(string, _)| string.clone()).collect();
let subscription_type = SubscriptionType::Periodical(
// TODO: get default polling parameters from server_config
Duration::from_secs(1),
Duration::from_secs(10),
);
create_response_stream(
state,
req.req_id,
subscription_type,
urls,
server_config,
tx,
)
.await
}
/// Handles a single FQP (Fully Qualified Path) request.
///
/// # Arguments
///
/// * `state` - The shared application state.
/// * `server_config` - The server configuration.
/// * `req_fqp` - The FQP request.
/// * `req_id` - The request ID.
/// * `tx` - The channel sender for updates.
///
/// # Returns
///
/// A `Result` indicating success or containing a `Status` error.
async fn handle_fqp(
state: Arc<AppState>,
server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>,
req_fqp: &RequestFqp,
req_id: String,
tx: mpsc::Sender<Result<Update, Status>>,
) -> Result<(), Status> {
let fqp = match &req_fqp.fqp {
Some(fqp) => fqp,
None => return Err(Status::invalid_argument("Address not specified")),
};
let identifiers: HashMap<String, Vec<String>> = fqp
.identifiers
.iter()
.map(|(k, v)| (k.clone(), vec![v.clone()]))
.collect();
// TODO: need adapt to proto encoded filters
let segments: Vec<&str> = fqp.specifier.split('/').collect();
let urls = match get_xpath_urls(&state, &identifiers, &segments).await {
Ok(urls) => urls,
Err(_) => return Err(Status::internal("Failed to get XPath URLs")),
};
let subscription_type = get_subscription_type(req_fqp)?;
create_response_stream(state, req_id, subscription_type, urls, server_config, tx).await
}
/// Handles FqpType::RedfishResource request, when Fqp.specifier represents an odata.type,
/// and the Fqp.specifier is used to select all sensors when it's "*Sensor".
///
/// # Arguments
///
/// * `state` - The shared application state.
/// * `server_config` - The server configuration.
/// * `req_fqp` - The FQP request.
/// * `req_id` - The request ID.
/// * `tx` - The channel sender for updates.
///
/// # Returns
///
/// A `Result` indicating success or containing a `Status` error.
async fn handle_sensor_request(
state: Arc<AppState>,
server_config: Arc<tokio::sync::RwLock<ProstServerConfig>>,
req_fqp: &RequestFqp,
req_id: String,
tx: mpsc::Sender<Result<Update, Status>>,
) -> Result<(), Status> {
// Use fixed identifiers
let identifiers: HashMap<String, Vec<String>> = HashMap::from([
("ChassisId".to_string(), vec!["*".to_string()]),
("SensorId".to_string(), vec!["*".to_string()]),
]);
let segments: Vec<&str> = "/redfish/v1/Chassis/{ChassisId}/Sensors/{SensorId}"
.split('/')
.collect();
let urls = match get_xpath_urls(&state, &identifiers, &segments).await {
Ok(urls) => urls,
Err(_) => return Err(Status::internal("Failed to get XPath URLs")),
};
let subscription_type = get_subscription_type(req_fqp)?;
create_response_stream(state, req_id, subscription_type, urls, server_config, tx).await
}
#[tonic::async_trait]
impl MachineTelemetry for BmcTelemetryService {
type SubscribeV2Stream = ReceiverStream<Result<Update, Status>>;
type SubscribeStream = Pin<Box<dyn Stream<Item = Result<Update, Status>> + Send + 'static>>;
/// Handles subscription requests and sets up a stream of telemetry updates.
///
/// # Arguments
///
/// * `request` - The incoming subscription request.
///
/// # Returns
///
/// A `Result` containing a `Response` with the subscription stream or a `Status` error.
async fn subscribe_v2(
&self,
request: Request<tonic::Streaming<TelemetryRequest>>,
) -> Result<Response<Self::SubscribeV2Stream>, Status> {
let mut stream = request.into_inner();
let state = self.state.clone();
let server_config = self.server_config.clone();
let (tx, rx) = mpsc::channel(16); // Adjust channel size as needed
tokio::spawn(async move {
while let Some(req) = match stream.message().await {
Ok(req) => req,
Err(e) => {
eprintln!("Error receiving stream message: {:?}", e);
return;
}
} {
if !req.req_config_group.is_empty() {
// Client subscribe by a server config name, like
// TelemetryRequest {
// req_id: "req_repairability".into(),
// req_config_group: "repairability_basic_cfg_group".into(),
// ..Default::default()
// };
if let Err(e) =
handle_config_group(state.clone(), server_config.clone(), req, tx.clone())
.await
{
eprintln!("Handler error: {:?}", e);
}
continue;
}
for req_fqp in &req.req_fqp {
if let Some(fqp) = &req_fqp.fqp {
println!("fqp: {:?}", fqp);
match FqpType::try_from(fqp.r#type) {
Ok(fqp_type) => {
match fqp_type {
FqpType::NotSet => {
// Client subscribe by a Fqp, with wildcard match to select
// all sensors on all chassis:
// Fqp {
// specifier: "/redfish/v1/Chassis/{ChassisId}/Sensors/{SensorId}".into(),
// identifiers: HashMap::from([
// ("ChassisId".into(), "*".into()),
// ("SensorId".into(), "*".into()),
// ]),
// r#type: FqpType::NotSet as i32,
// ..Default::default()
// };
if let Err(e) = handle_fqp(
state.clone(),
server_config.clone(),
req_fqp,
req.req_id.clone(),
tx.clone(),
)
.await
{
eprintln!("Handler error: {:?}", e);
}
}
FqpType::RedfishResource => {
// Client subscribe by a Fqp, with Redfish odata.type to
// select all resouces has that type:
// Fqp {
// specifier: ""#Sensor.v1_2_0.Sensor"".into(),
// r#type: FqpType::RedfishResource as i32,
// ..Default::default()
// };
if fqp.specifier.contains("Sensor") {
if let Err(e) = handle_sensor_request(
state.clone(),
server_config.clone(),
req_fqp,
req.req_id.clone(),
tx.clone(),
)
.await
{
eprintln!("Handler error: {:?}", e);
}
} else {
eprintln!("Subscribe by RedfishResource for {} not implemented", fqp.specifier);
}
}
}
}
Err(e) => {
eprintln!("Invalid FqpType value: {}. Error: {:?}", fqp.r#type, e);
continue;
}
}
}
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn get(&self, _: Request<TelemetryRequest>) -> Result<Response<Update>, Status> {
Err(Status::new(
Code::Internal,
"gRPC method not implemented".to_string(),
))
}
async fn put(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> {
Err(Status::new(
Code::Internal,
"gRPC method not implemented".to_string(),
))
}
async fn post(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> {
Err(Status::new(
Code::Internal,
"gRPC method not implemented".to_string(),
))
}
async fn patch(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> {
Err(Status::new(
Code::Internal,
"gRPC method not implemented".to_string(),
))
}
async fn delete(&self, _: Request<SetRequest>) -> Result<Response<Update>, Status> {
Err(Status::new(
Code::Internal,
"gRPC method not implemented".to_string(),
))
}
async fn subscribe(
&self,
_: Request<TelemetryRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
Err(Status::new(
Code::Internal,
"gRPC method not implemented".to_string(),
))
}
}