| // Copyright 2025 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| use anyhow::{anyhow, Result}; |
| use chrono::{DateTime, NaiveDateTime, Utc}; |
| use clap::{arg, Command}; |
| use futures_util::stream::StreamExt; |
| #[cfg(feature = "client_debug")] |
| use redfish_codegen::models::sensor::v1_7_0::Sensor as SensorModel; |
| #[cfg(feature = "client_debug")] |
| use reqwest::Client; |
| #[cfg(feature = "client_debug")] |
| use serde_json::Value; |
| #[cfg(feature = "client_debug")] |
| use std::collections::HashMap; |
| use std::fs::File; |
| use std::io::Read; |
| use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity}; |
| use tonic::Request; |
| |
| mod app_state; |
| mod composite_query; |
| mod dbus_client; |
| mod grpc; |
| mod handlers; |
| mod telemetry_source_manager; |
| |
| #[cfg(feature = "client_debug")] |
| use crate::grpc::third_party_voyager::{Fqp, FqpType, RequestFqp}; |
| |
| use crate::grpc::third_party_voyager::{ |
| data_point, machine_telemetry_client::MachineTelemetryClient, typed_value, |
| Request as TelemetryRequest, Update, |
| }; |
| |
| fn format_timestamp(timestamp_ns: u64) -> String { |
| // Convert nanoseconds to seconds and nanoseconds |
| let seconds = (timestamp_ns / 1_000_000_000) as i64; |
| let nanoseconds = (timestamp_ns % 1_000_000_000) as u32; |
| |
| // Create a NaiveDateTime from the timestamp |
| let naive = NaiveDateTime::from_timestamp_opt(seconds, nanoseconds).expect("Invalid timestamp"); |
| |
| // Convert to DateTime<Utc> |
| let datetime: DateTime<Utc> = DateTime::from_utc(naive, Utc); |
| |
| // Format the datetime |
| datetime.format("%Y:%m:%d:%H:%M:%S.%3f").to_string() |
| } |
| |
| async fn print_datapoint(update: &Update) -> Result<()> { |
| for (index, datapoint) in update.data_points.iter().enumerate() { |
| if let Some(data_point::Data::KeyValue(typed_struct)) = &datapoint.data { |
| print!("{index}th datapoint, "); |
| match typed_struct |
| .fields |
| .get("@odata.id") |
| .and_then(|v| v.value.as_ref()) |
| { |
| Some(typed_value::Value::StringVal(id)) => print!("@odata.id: {}, ", id), |
| None => print!("@odata.id not changed, "), |
| _ => print!("@odata.id invalid type, "), |
| } |
| |
| match typed_struct |
| .fields |
| .get("SensorValue") |
| .and_then(|v| v.value.as_ref()) |
| { |
| Some(typed_value::Value::DoubleVal(value)) => print!("value: {}, ", value), |
| None => print!("value not changed, "), |
| _ => print!("value invalid type, "), |
| } |
| |
| #[cfg(feature = "client_debug")] |
| { |
| // Fetch data from local server |
| let url = format!("http://localhost:8088{}", odata_id); |
| let client = Client::new(); |
| let response = client.get(&url).send().await?; |
| let body = response.text().await?; |
| let sensor: SensorModel = serde_json::from_str(&body)?; |
| |
| if let (Some(reading), Some(min), Some(max)) = ( |
| sensor.reading, |
| sensor.reading_range_min, |
| sensor.reading_range_max, |
| ) { |
| let corrected_reading = reading.max(min).min(max); |
| println!(" Fetched Reading: {}", corrected_reading); |
| |
| // Compare readings |
| let difference = (sensor_value - corrected_reading).abs(); |
| let threshold = corrected_reading * 0.05; // 5% of the reading |
| if difference > threshold { |
| eprintln!("Error: sensor {odata_id} value {sensor_value} is out of 5% range of fetched Reading {corrected_reading}"); |
| return Err(anyhow!("Value not matching Redfish query result")); |
| } |
| } else { |
| return Err(anyhow!("Failed to get Reading from fetched data")); |
| } |
| } |
| } |
| println!("timestamp: {}", format_timestamp(datapoint.timestamp_ns)); |
| } |
| println!(); |
| |
| Ok(()) |
| } |
| |
| fn cli() -> Command { |
| Command::new("my_client") |
| .about("Starts a client with the specified configuration") |
| .arg( |
| arg!(-i --insecure "Disables SSL certificate verification") |
| .required(false) |
| .default_value("false") |
| .action(clap::ArgAction::SetTrue), |
| ) |
| .arg( |
| arg!(-k --key <KEY> "Server's private key file name") |
| .required_if_eq("insecure", "false") |
| .value_parser(clap::value_parser!(String)), |
| ) |
| .arg( |
| arg!(-c --cert <CERT> "Server's certificate file name") |
| .required_if_eq("insecure", "false") |
| .value_parser(clap::value_parser!(String)), |
| ) |
| .arg( |
| arg!(-a --cacert <CACERT> "CA certificate chain file name") |
| .required_if_eq("insecure", "false") |
| .value_parser(clap::value_parser!(String)), |
| ) |
| .arg( |
| arg!(-p --policy <POLICY> "Client authentication policy file name") |
| .required_if_eq("insecure", "false") |
| .value_parser(clap::value_parser!(String)), |
| ) |
| .arg( |
| arg!(-s --server_dns <SERVERNDS> "gRPC server's certificate DNS name, extract from command: |
| $ openssl x509 -noout -text -in grpc-server-cert.pem | grep DNS |
| DNS:bcm-node-name.some-company.com") |
| .required_if_eq("insecure", "false") |
| .value_parser(clap::value_parser!(String)), |
| ) |
| .arg( |
| arg!(-h --host <HOST> "gRPC server's host name") |
| .default_value("localhost") |
| .value_parser(clap::value_parser!(String)), |
| ) |
| .arg( |
| arg!(-P --port <PORT> "Server's listening port to connect") |
| .required(true) |
| .value_parser(clap::value_parser!(u16)), |
| ) |
| } |
| |
| fn load_cert(file_name: &str) -> Result<Vec<u8>, std::io::Error> { |
| let mut file = File::open(file_name)?; |
| let mut buffer = Vec::new(); |
| file.read_to_end(&mut buffer)?; |
| Ok(buffer) |
| } |
| |
| fn load_key(path: &str) -> Result<Vec<u8>, std::io::Error> { |
| load_cert(path) // Since they are both loaded the same way |
| } |
| |
| #[tokio::main] |
| async fn main() -> Result<(), Box<dyn std::error::Error>> { |
| // TODO: disable log due to https://github.com/extrawurst/gitui/issues/2094 |
| // env_logger::init_from_env( |
| // env_logger::Env::default().default_filter_or("info,tonic=debug,hyper=debug,rustls=debug"), |
| // ); |
| |
| let matches = cli().get_matches(); |
| let insecure = matches.get_flag("insecure"); |
| let host = matches.get_one::<String>("host").unwrap(); |
| let port = matches |
| .get_one::<u16>("port") |
| .expect("required argument") |
| .to_owned(); |
| |
| let url = if insecure { |
| format!("http://{host}:{port}") |
| } else { |
| format!("https://{host}:{port}") |
| }; |
| let channel = if insecure { |
| Channel::builder(url.parse()?).connect().await? |
| } else { |
| #[cfg(feature = "mtls")] |
| { |
| let key = matches.get_one::<String>("key").unwrap(); |
| let cert = matches.get_one::<String>("cert").unwrap(); |
| let cacert = matches.get_one::<String>("cacert").unwrap(); |
| let server_dns = matches.get_one::<String>("server_dns").unwrap(); |
| |
| // Load client's certificate and private key |
| let cert = load_cert(cert)?; |
| let key = load_key(key)?; |
| let client_identity = Identity::from_pem(cert, key); |
| |
| // Load CA certificate to verify the server |
| let server_ca_cert = load_cert(cacert)?; |
| let server_ca_cert = Certificate::from_pem(server_ca_cert); |
| |
| let tls_config = ClientTlsConfig::new() |
| .identity(client_identity) |
| .ca_certificate(server_ca_cert) |
| .domain_name(server_dns); |
| |
| Channel::builder(url.parse()?) |
| .tls_config(tls_config)? |
| .connect() |
| .await? |
| } |
| #[cfg(not(feature = "mtls"))] |
| { |
| return Err("TLS support is not enabled".into()); |
| } |
| }; |
| |
| let mut client = MachineTelemetryClient::new(channel); |
| |
| let outbound = async_stream::stream! { |
| let requests = vec![ |
| TelemetryRequest { |
| req_id: "req_repairability".into(), |
| req_config_group: "repairability_basic_cfg_group".into(), |
| ..Default::default() |
| }, |
| ]; |
| |
| for request in requests { |
| yield request; |
| } |
| }; |
| |
| // Make the call to SubscribeV2 |
| let response = client.subscribe_v2(Request::new(outbound)).await?; |
| |
| // Handle the stream of responses |
| let mut inbound = response.into_inner(); |
| while let Some(update) = inbound.next().await { |
| match update { |
| Ok(update) => { |
| #[cfg(feature = "client_debug")] |
| println!("Received update: {:?}", update); |
| let _ = print_datapoint(&update).await; |
| } |
| Err(e) => eprintln!("Error receiving update: {:?}", e), |
| } |
| } |
| |
| Ok(()) |
| } |