blob: 4dca33c1ecdc64e05102ab7434d1ce5514fc46c9 [file] [log] [blame]
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::{anyhow, Result};
use chrono::{DateTime, NaiveDateTime, Utc};
use clap::{arg, Command};
use futures_util::stream::StreamExt;
#[cfg(feature = "client_debug")]
use redfish_codegen::models::sensor::v1_7_0::Sensor as SensorModel;
#[cfg(feature = "client_debug")]
use reqwest::Client;
#[cfg(feature = "client_debug")]
use serde_json::Value;
#[cfg(feature = "client_debug")]
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
use tonic::Request;
mod app_state;
mod composite_query;
mod dbus_client;
mod grpc;
mod handlers;
mod telemetry_source_manager;
#[cfg(feature = "client_debug")]
use crate::grpc::third_party_voyager::{Fqp, FqpType, RequestFqp};
use crate::grpc::third_party_voyager::{
data_point, machine_telemetry_client::MachineTelemetryClient, typed_value,
Request as TelemetryRequest, Update,
};
fn format_timestamp(timestamp_ns: u64) -> String {
// Convert nanoseconds to seconds and nanoseconds
let seconds = (timestamp_ns / 1_000_000_000) as i64;
let nanoseconds = (timestamp_ns % 1_000_000_000) as u32;
// Create a NaiveDateTime from the timestamp
let naive = NaiveDateTime::from_timestamp_opt(seconds, nanoseconds).expect("Invalid timestamp");
// Convert to DateTime<Utc>
let datetime: DateTime<Utc> = DateTime::from_utc(naive, Utc);
// Format the datetime
datetime.format("%Y:%m:%d:%H:%M:%S.%3f").to_string()
}
async fn print_datapoint(update: &Update) -> Result<()> {
for (index, datapoint) in update.data_points.iter().enumerate() {
if let Some(data_point::Data::KeyValue(typed_struct)) = &datapoint.data {
print!("{index}th datapoint, ");
match typed_struct
.fields
.get("@odata.id")
.and_then(|v| v.value.as_ref())
{
Some(typed_value::Value::StringVal(id)) => print!("@odata.id: {}, ", id),
None => print!("@odata.id not changed, "),
_ => print!("@odata.id invalid type, "),
}
match typed_struct
.fields
.get("SensorValue")
.and_then(|v| v.value.as_ref())
{
Some(typed_value::Value::DoubleVal(value)) => print!("value: {}, ", value),
None => print!("value not changed, "),
_ => print!("value invalid type, "),
}
#[cfg(feature = "client_debug")]
{
// Fetch data from local server
let url = format!("http://localhost:8088{}", odata_id);
let client = Client::new();
let response = client.get(&url).send().await?;
let body = response.text().await?;
let sensor: SensorModel = serde_json::from_str(&body)?;
if let (Some(reading), Some(min), Some(max)) = (
sensor.reading,
sensor.reading_range_min,
sensor.reading_range_max,
) {
let corrected_reading = reading.max(min).min(max);
println!(" Fetched Reading: {}", corrected_reading);
// Compare readings
let difference = (sensor_value - corrected_reading).abs();
let threshold = corrected_reading * 0.05; // 5% of the reading
if difference > threshold {
eprintln!("Error: sensor {odata_id} value {sensor_value} is out of 5% range of fetched Reading {corrected_reading}");
return Err(anyhow!("Value not matching Redfish query result"));
}
} else {
return Err(anyhow!("Failed to get Reading from fetched data"));
}
}
}
println!("timestamp: {}", format_timestamp(datapoint.timestamp_ns));
}
println!();
Ok(())
}
fn cli() -> Command {
Command::new("my_client")
.about("Starts a client with the specified configuration")
.arg(
arg!(-i --insecure "Disables SSL certificate verification")
.required(false)
.default_value("false")
.action(clap::ArgAction::SetTrue),
)
.arg(
arg!(-k --key <KEY> "Server's private key file name")
.required_if_eq("insecure", "false")
.value_parser(clap::value_parser!(String)),
)
.arg(
arg!(-c --cert <CERT> "Server's certificate file name")
.required_if_eq("insecure", "false")
.value_parser(clap::value_parser!(String)),
)
.arg(
arg!(-a --cacert <CACERT> "CA certificate chain file name")
.required_if_eq("insecure", "false")
.value_parser(clap::value_parser!(String)),
)
.arg(
arg!(-p --policy <POLICY> "Client authentication policy file name")
.required_if_eq("insecure", "false")
.value_parser(clap::value_parser!(String)),
)
.arg(
arg!(-s --server_dns <SERVERNDS> "gRPC server's certificate DNS name, extract from command:
$ openssl x509 -noout -text -in grpc-server-cert.pem | grep DNS
DNS:bcm-node-name.some-company.com")
.required_if_eq("insecure", "false")
.value_parser(clap::value_parser!(String)),
)
.arg(
arg!(-h --host <HOST> "gRPC server's host name")
.default_value("localhost")
.value_parser(clap::value_parser!(String)),
)
.arg(
arg!(-P --port <PORT> "Server's listening port to connect")
.required(true)
.value_parser(clap::value_parser!(u16)),
)
}
fn load_cert(file_name: &str) -> Result<Vec<u8>, std::io::Error> {
let mut file = File::open(file_name)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
Ok(buffer)
}
fn load_key(path: &str) -> Result<Vec<u8>, std::io::Error> {
load_cert(path) // Since they are both loaded the same way
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: disable log due to https://github.com/extrawurst/gitui/issues/2094
// env_logger::init_from_env(
// env_logger::Env::default().default_filter_or("info,tonic=debug,hyper=debug,rustls=debug"),
// );
let matches = cli().get_matches();
let insecure = matches.get_flag("insecure");
let host = matches.get_one::<String>("host").unwrap();
let port = matches
.get_one::<u16>("port")
.expect("required argument")
.to_owned();
let url = if insecure {
format!("http://{host}:{port}")
} else {
format!("https://{host}:{port}")
};
let channel = if insecure {
Channel::builder(url.parse()?).connect().await?
} else {
#[cfg(feature = "mtls")]
{
let key = matches.get_one::<String>("key").unwrap();
let cert = matches.get_one::<String>("cert").unwrap();
let cacert = matches.get_one::<String>("cacert").unwrap();
let server_dns = matches.get_one::<String>("server_dns").unwrap();
// Load client's certificate and private key
let cert = load_cert(cert)?;
let key = load_key(key)?;
let client_identity = Identity::from_pem(cert, key);
// Load CA certificate to verify the server
let server_ca_cert = load_cert(cacert)?;
let server_ca_cert = Certificate::from_pem(server_ca_cert);
let tls_config = ClientTlsConfig::new()
.identity(client_identity)
.ca_certificate(server_ca_cert)
.domain_name(server_dns);
Channel::builder(url.parse()?)
.tls_config(tls_config)?
.connect()
.await?
}
#[cfg(not(feature = "mtls"))]
{
return Err("TLS support is not enabled".into());
}
};
let mut client = MachineTelemetryClient::new(channel);
let outbound = async_stream::stream! {
let requests = vec![
TelemetryRequest {
req_id: "req_repairability".into(),
req_config_group: "repairability_basic_cfg_group".into(),
..Default::default()
},
];
for request in requests {
yield request;
}
};
// Make the call to SubscribeV2
let response = client.subscribe_v2(Request::new(outbound)).await?;
// Handle the stream of responses
let mut inbound = response.into_inner();
while let Some(update) = inbound.next().await {
match update {
Ok(update) => {
#[cfg(feature = "client_debug")]
println!("Received update: {:?}", update);
let _ = print_datapoint(&update).await;
}
Err(e) => eprintln!("Error receiving update: {:?}", e),
}
}
Ok(())
}