blob: 4d21998f3b30c36471c8d7d0fbdc966242f1e074 [file] [log] [blame]
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![doc = include_str!("README.md")]
use anyhow::Result;
use dashmap::DashMap;
use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Mutex;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Notify, RwLock};
/// A trait for asynchronous value reading.
#[async_trait::async_trait]
pub trait AsyncReadValue<T: Clone + Send + 'static>: Send + Sync {
/// Read values and return them with timestamps.
///
/// # Arguments
///
/// * `sampling_interval` - The interval between each sample.
/// * `reporting_interval` - The interval for reporting batched values.
///
/// # Returns
///
/// A vector of values with their corresponding timestamps.
async fn read_values(
&self,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Vec<(T, SystemTime)>, std::io::Error>;
/// Read current value and return it with timestamp.
///
/// # Returns
///
/// A value with its corresponding timestamp.
#[allow(dead_code)]
async fn get_value(&self) -> Result<(T, SystemTime), std::io::Error>;
}
/// Telemetry Source Data Model.
pub struct TelemetrySource<T: Clone + Send + 'static> {
/// Primary key.
name: String,
/// Secondary keys, like Some(vec![("FRU", "CPU"), ("ReadingType", "Temperature")]).
secondary_keys: Option<Vec<(String, String)>>,
/// Client provided reader to get telemetry source value update.
value_reader: Arc<dyn AsyncReadValue<T>>,
/// Default update interval for polling telemetry source value.
default_update_interval: Duration,
/// Lowest limit of update interval for telemetry source HW.
minimum_update_interval: Duration,
/// Flag to stop background telemetry source HW polling task.
stop_flag: Arc<AtomicBool>,
/// Current update interval for polling telemetry source value.
current_update_interval: AtomicUsize, // Store in milliseconds
/// Current reporting interval for batch reporting.
current_reporting_interval: AtomicUsize, // Store in milliseconds
/// Flag to wake up the polling task.
wakeup_flag: Arc<Notify>,
/// Flag to indicate if the telemetry source is currently polling.
is_polling: Arc<std::sync::RwLock<bool>>,
}
impl<T: Clone + Send + 'static> TelemetrySource<T> {
/// Creates a new `TelemetrySource`.
///
/// # Arguments
///
/// * `name` - A string that holds the primary key for the telemetry source.
/// * `secondary_keys` - An optional vector of key-value pairs representing additional keys.
/// * `value_reader` - An `Arc` containing a trait object that implements `AsyncReadValue`.
/// * `default_update_interval` - The default duration between value updates.
/// * `minimum_update_interval` - The minimum allowable duration between value updates.
/// * `reporting_interval` - The interval of batch reporting of values.
///
/// # Returns
///
/// A new `Box<TelemetrySource>` instance.
pub fn new(
name: String,
secondary_keys: Option<Vec<(String, String)>>,
value_reader: Arc<dyn AsyncReadValue<T>>,
default_update_interval: Duration,
minimum_update_interval: Duration,
reporting_interval: Duration,
) -> Box<Self> {
Box::new(Self {
name,
secondary_keys,
value_reader,
default_update_interval,
minimum_update_interval,
stop_flag: Arc::new(AtomicBool::new(false)),
current_update_interval: AtomicUsize::new(default_update_interval.as_millis() as usize),
current_reporting_interval: AtomicUsize::new(reporting_interval.as_millis() as usize),
wakeup_flag: Arc::new(Notify::new()),
is_polling: Arc::new(std::sync::RwLock::new(false)),
})
}
}
impl<T: Clone + Send + 'static> fmt::Debug for TelemetrySource<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TelemetrySource")
.field("name", &self.name)
.field("secondary_keys", &self.secondary_keys)
.field("default_update_interval", &self.default_update_interval)
.field("minimum_update_interval", &self.minimum_update_interval)
.field("current_update_interval", &self.get_current_interval())
.field(
"current_reporting_interval",
&self.get_current_reporting_interval(),
)
.finish()
}
}
/// Define the TelemetrySourceTrait for type erase purpose, allowing different types
/// of telemetry sources to be stored and managed uniformly in the TelemetrySourceManager
pub trait TelemetrySourceTrait: Send + Sync {
/// Gets the name of the telemetry source.
fn get_name(&self) -> &str;
/// Gets the secondary keys of the telemetry source.
fn get_secondary_keys(&self) -> Option<&Vec<(String, String)>>;
/// Sets the stop flag for the telemetry source.
#[allow(dead_code)]
fn set_stop_flag(&self, value: bool);
/// Gets the current polling interval.
fn get_current_interval(&self) -> Duration;
/// Sets the current polling interval.
fn set_current_interval(&self, new_interval: Duration);
/// Gets the current reporting interval.
fn get_current_reporting_interval(&self) -> Duration;
/// Sets the current reporting interval.
fn set_current_reporting_interval(&self, new_interval: Duration);
/// Gets the default update interval.
fn get_default_update_interval(&self) -> Duration;
/// Gets the wakeup flag for the telemetry source.
fn get_wakeup_flag(&self) -> Arc<Notify>;
/// Gets the is_polling flag for the telemetry source.
fn get_is_polling(&self) -> Arc<std::sync::RwLock<bool>>;
/// Converts the trait object to Any.
fn as_any(&self) -> &dyn Any;
}
impl<T: Clone + Send + 'static> TelemetrySourceTrait for TelemetrySource<T> {
fn get_name(&self) -> &str {
&self.name
}
fn get_secondary_keys(&self) -> Option<&Vec<(String, String)>> {
self.secondary_keys.as_ref()
}
fn set_current_interval(&self, new_interval: Duration) {
let new_interval_ms = std::cmp::max(
new_interval.as_millis(),
self.minimum_update_interval.as_millis(),
);
let new_interval_ms =
std::cmp::min(new_interval_ms, self.default_update_interval.as_millis()) as usize;
self.current_update_interval
.store(new_interval_ms, Ordering::SeqCst);
if self.current_reporting_interval.load(Ordering::SeqCst) < new_interval_ms {
self.set_current_reporting_interval(Duration::from_millis(new_interval_ms as u64));
}
}
fn set_current_reporting_interval(&self, new_interval: Duration) {
let new_interval_ms = std::cmp::max(
new_interval.as_millis() as usize,
self.current_update_interval.load(Ordering::SeqCst),
);
self.current_reporting_interval
.store(new_interval_ms, Ordering::SeqCst);
}
fn get_current_interval(&self) -> Duration {
Duration::from_millis(self.current_update_interval.load(Ordering::SeqCst) as u64)
}
fn get_current_reporting_interval(&self) -> Duration {
Duration::from_millis(self.current_reporting_interval.load(Ordering::SeqCst) as u64)
}
fn get_default_update_interval(&self) -> Duration {
self.default_update_interval
}
fn set_stop_flag(&self, value: bool) {
self.stop_flag.store(value, Ordering::SeqCst);
}
fn get_wakeup_flag(&self) -> Arc<Notify> {
self.wakeup_flag.clone()
}
fn get_is_polling(&self) -> Arc<std::sync::RwLock<bool>> {
self.is_polling.clone()
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Telemetry struct representing a value change or periodic update.
#[derive(Clone, Debug, PartialEq)]
pub struct TypedTelemetry<T: Clone + Send + 'static> {
/// Telemetry source name associated with the telemetry.
pub source_name: String,
/// Telemetry type as defined in Redfish spec for the Telemetry.
pub telemetry_type: String,
/// Value associated with the telemetry.
pub value: T,
/// Sampling timestamp.
pub timestamp: SystemTime,
}
/// This trait allows for type-agnostic telemetry handling, enabling the system to work
/// with different types of telemetries without knowing their specific types.
pub trait Telemetry: Send + Debug {
/// Gets the source name of the telemetry.
#[allow(dead_code)]
fn source_name(&self) -> &str;
/// Gets the telemetry type.
#[allow(dead_code)]
fn telemetry_type(&self) -> &str;
/// Gets the timestamp of the telemetry.
#[allow(dead_code)]
fn timestamp(&self) -> SystemTime;
/// Converts the trait object to Any.
fn as_any(&self) -> &dyn Any;
}
impl<T: Clone + Send + Debug + 'static> Telemetry for TypedTelemetry<T> {
fn source_name(&self) -> &str {
&self.source_name
}
fn telemetry_type(&self) -> &str {
&self.telemetry_type
}
fn timestamp(&self) -> SystemTime {
self.timestamp
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type SampleInterval = Duration;
pub type ReportInterval = Duration;
/// Subscription Type Enum.
#[derive(Clone, Debug, PartialEq)]
pub enum SubscriptionType {
/// Whenever value changed, send an Telemetry.
OnChange,
/// Sampling interval, when it's shorter than the telemetry source's update_interval, the Telemetry
/// will be sent at update_interval (but never shorter than the minimum_update_interval).
/// Reporting interval, which should be no shorter than the sampling interval
Periodical(SampleInterval, ReportInterval),
}
/// Type alias for telemetry source subscription ID.
type SubscriptionId = usize;
/// Type alias for telemetry sender.
type TelemetrySender = Sender<Vec<Box<dyn Telemetry + Send>>>;
/// Represents a subscription to a telemetry source.
///
/// This trait defines the interface for managing subscriptions to telemetry data.
/// Implementations of this trait should be able to handle different types of
/// telemetry data and subscription models.
pub trait Subscription: Send + Sync {
/// Returns a reference to the telemetry sender associated with this subscription.
///
/// # Returns
///
/// A reference to the `TelemetrySender` used to send telemetry data.
fn get_sender(&self) -> &TelemetrySender;
/// Returns the type of this subscription.
///
/// # Returns
///
/// A reference to the `SubscriptionType` indicating whether this is an OnChange
/// or Periodical subscription.
fn get_subscription_type(&self) -> &SubscriptionType;
/// Retrieves the timestamp of the last telemetry update.
///
/// # Returns
///
/// An `Option<SystemTime>` representing the timestamp of the last telemetry update,
/// or `None` if no update has occurred yet.
#[allow(dead_code)]
fn get_last_telemetry_timestamp(&self) -> Option<SystemTime>;
/// Sets the timestamp of the last telemetry update.
///
/// # Arguments
///
/// * `timestamp` - A `SystemTime` representing the new last update timestamp.
#[allow(dead_code)]
fn set_last_telemetry_timestamp(&self, timestamp: SystemTime);
/// Retrieves the current value of the telemetry data.
///
/// # Returns
///
/// A `Box<dyn Any>` containing the current telemetry value.
#[allow(dead_code)]
fn get_current_value(&self) -> Box<dyn Any>;
/// Sets the current value of the telemetry data.
///
/// # Arguments
///
/// * `value` - A `Box<dyn Any>` containing the new telemetry value.
#[allow(dead_code)]
fn set_current_value(&self, value: Box<dyn Any>);
/// Converts this trait object to `&dyn Any`.
///
/// This method is used for downcasting.
///
/// # Returns
///
/// A reference to `dyn Any`.
#[allow(dead_code)]
fn as_any(&self) -> &dyn Any;
/// Converts this mutable trait object to `&mut dyn Any`.
///
/// This method is used for mutable downcasting.
///
/// # Returns
///
/// A mutable reference to `dyn Any`.
fn as_any_mut(&mut self) -> &mut dyn Any;
}
/// A typed implementation of the `Subscription` trait.
///
/// This struct provides a concrete implementation of `Subscription` for a specific
/// telemetry data type `T`.
///
/// # Type Parameters
///
/// * `T`: The type of telemetry data this subscription handles. It must be `Clone`,
/// `Send`, and have a `'static` lifetime.
pub struct TypedSubscription<T: Clone + Send + 'static> {
/// The sender used to transmit telemetry data.
sender: TelemetrySender,
/// The type of this subscription (OnChange or Periodical).
subscription_type: SubscriptionType,
/// The timestamp of the last telemetry update, wrapped in a mutex for thread-safe access.
last_telemetry_timestamp: Mutex<Option<SystemTime>>,
/// The current value of the telemetry data, wrapped in a mutex for thread-safe access.
current_value: Mutex<Option<T>>,
}
impl<T: Clone + Send + 'static> Subscription for TypedSubscription<T> {
fn get_sender(&self) -> &TelemetrySender {
&self.sender
}
fn get_subscription_type(&self) -> &SubscriptionType {
&self.subscription_type
}
fn get_last_telemetry_timestamp(&self) -> Option<SystemTime> {
*self.last_telemetry_timestamp.lock().unwrap()
}
fn set_last_telemetry_timestamp(&self, timestamp: SystemTime) {
*self.last_telemetry_timestamp.lock().unwrap() = Some(timestamp);
}
fn get_current_value(&self) -> Box<dyn Any> {
Box::new(self.current_value.lock().unwrap().clone())
}
fn set_current_value(&self, value: Box<dyn Any>) {
if let Ok(typed_value) = value.downcast::<T>() {
*self.current_value.lock().unwrap() = Some(*typed_value);
}
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
pub struct TelemetrySourceManager {
/// Object store with primary key.
sources: DashMap<String, Arc<Box<dyn TelemetrySourceTrait>>>,
/// Secondary key maps, as <key_name, HashMap<key_value, Vec<primary_keys>>.
secondary_map: DashMap<String, HashMap<String, Vec<String>>>,
/// Subscription map as <telemetry source name, Subscriptions>.
telemetry_subscriptions: DashMap<String, HashMap<SubscriptionId, Box<dyn Subscription>>>,
/// Atomic variable used to generate next subscription_id.
next_subscription_id: AtomicUsize,
/// The lock is only needed for add/remove telemetry sources.
lock: RwLock<()>,
}
impl TelemetrySourceManager {
/// Creates a new `TelemetrySourceManager`.
///
/// # Returns
///
/// A new `Arc<TelemetrySourceManager>` instance.
pub fn new() -> Arc<Self> {
Arc::new(Self {
sources: DashMap::new(),
secondary_map: DashMap::new(),
telemetry_subscriptions: DashMap::new(),
next_subscription_id: AtomicUsize::new(0),
lock: RwLock::new(()),
})
}
/// Adds an telemetry source to the manager.
///
/// # Arguments
///
/// * `source` - The telemetry source to be added.
///
/// # Returns
///
/// A Result indicating success or failure.
pub async fn add_source<T: Clone + Send + PartialEq + Debug + 'static>(
self: &Arc<Self>,
source: Box<dyn TelemetrySourceTrait>,
) -> Result<()> {
let _write_guard = self.lock.write().await;
let source_name = source.get_name().to_string();
if let Some(secondary_keys) = source.get_secondary_keys() {
for (key_name, key_value) in secondary_keys {
self.secondary_map
.entry(key_name.clone())
.or_default()
.entry(key_value.clone())
.or_default()
.push(source_name.clone());
}
}
let source = Arc::new(source);
self.sources.insert(source_name.clone(), source.clone());
let manager = self.clone();
tokio::spawn(async move {
loop {
if source
.as_any()
.downcast_ref::<TelemetrySource<T>>()
.expect("Failed to downcast source")
.stop_flag
.load(Ordering::SeqCst)
{
break;
}
source.get_wakeup_flag().notified().await;
if !*source.get_is_polling().read().unwrap() {
continue;
}
loop {
if source
.as_any()
.downcast_ref::<TelemetrySource<T>>()
.expect("Failed to downcast source")
.stop_flag
.load(Ordering::SeqCst)
{
return;
}
let update_interval = source.get_current_interval();
let reporting_interval = source.get_current_reporting_interval();
if !*source.get_is_polling().read().unwrap() {
break;
}
match source
.as_any()
.downcast_ref::<TelemetrySource<T>>()
.expect("Failed to downcast source")
.value_reader
.read_values(update_interval, reporting_interval)
.await
{
Ok(values) => {
if !values.is_empty() {
if let Err(e) = manager.send_telemetries(&source_name, values).await
{
eprintln!("Error sending telemetries: {}", e);
}
}
}
Err(e) => eprintln!("Error reading values: {}", e),
}
}
}
});
Ok(())
}
/// Removes an telemetry source from the manager.
///
/// CAUTION! All reference got from those query interfaces to this source must be dropped before making this call,
/// otherwise this call will deadlock.
///
/// # Arguments
///
/// * `source_name` - The name of the telemetry source to remove.
///
/// # Returns
///
/// A Result indicating success or failure.
#[allow(dead_code)]
pub async fn remove_source(self: &Arc<Self>, source_name: &str) -> Result<()> {
if let Some((_, source)) = self.sources.remove(source_name) {
source.set_stop_flag(true);
*source.get_is_polling().write().unwrap() = false;
if let Some(secondary_keys) = source.get_secondary_keys() {
for (key_name, key_value) in secondary_keys {
if let Some(mut key_map) = self.secondary_map.get_mut(key_name) {
key_map.retain(|k, v| {
if k == key_value {
v.retain(|name| name != source_name);
!v.is_empty()
} else {
true
}
});
}
}
}
self.telemetry_subscriptions.remove(source_name);
Ok(())
} else {
Err(anyhow::anyhow!("Source not found: {}", source_name))
}
}
/// Returns a list of all secondary keys.
///
/// # Returns
///
/// A vector of strings representing all secondary keys.
#[allow(dead_code)]
pub fn query_all_secondary_keys(&self) -> Vec<String> {
self.secondary_map
.iter()
.map(|entry| entry.key().clone())
.collect()
}
/// Queries telemetry sources by primary keys.
///
/// # Arguments
///
/// * `primary_keys` - A slice of primary key strings.
///
/// # Returns
///
/// A Result containing a vector of telemetry sources matching the primary keys,
/// or an error if no sources are found.
pub fn query_sources_by_names(&self, primary_keys: &[&str]) -> Result<Vec<String>> {
let result: Vec<_> = if primary_keys.is_empty() {
self.sources
.iter()
.map(|entry| entry.key().clone())
.collect()
} else {
primary_keys
.iter()
.filter_map(|&key| {
if self.sources.contains_key(key) {
Some(key.to_string())
} else {
None
}
})
.collect()
};
if result.is_empty() {
Err(anyhow::anyhow!("No sources found"))
} else {
Ok(result)
}
}
/// Queries telemetry sources by a secondary key property.
///
/// # Arguments
///
/// * `key` - A tuple containing the secondary key name and value.
///
/// # Returns
///
/// A Result containing a vector of telemetry sources matching the secondary key property,
/// or an error if the secondary key is not found or no sources match.
#[allow(dead_code)]
pub fn query_sources_by_property(&self, key: (&str, &str)) -> Result<Vec<String>> {
self.secondary_map
.get(key.0)
.and_then(|map| map.get(key.1).cloned())
.ok_or_else(|| anyhow::anyhow!("Secondary key not found: {}", key.0))
.and_then(|sources| {
if sources.is_empty() {
Err(anyhow::anyhow!("No sources found for the given property"))
} else {
Ok(sources)
}
})
}
/// Subscribes to telemetry source telemetries.
///
/// CAUTION! The returned TelemetrySourceSubscription need to be moved to a separate task to
/// run, otherwise drop it may cause deadlock.
///
/// # Arguments
///
/// * `source_name` - The name of the telemetry source to subscribe to.
/// * `subscription_type` - The type of subscription (OnChange, Periodical).
///
/// # Returns
///
/// A Result containing an TelemetrySourceSubscription, or an error if the source is not found.
pub fn subscribe_telemetries<T: Clone + Send + PartialEq + 'static>(
self: &Arc<Self>,
source_name: &str,
subscription_type: SubscriptionType,
) -> Result<TelemetrySourceSubscription<T>> {
if self.sources.contains_key(source_name) {
let (telemetry_sender, telemetry_receiver) = mpsc::channel(100);
let subscription_id = self.next_subscription_id.fetch_add(1, Ordering::SeqCst);
let subscription: TypedSubscription<T> = TypedSubscription {
sender: telemetry_sender.clone(),
subscription_type: subscription_type.clone(),
last_telemetry_timestamp: Mutex::new(None),
current_value: Mutex::new(None),
};
self.telemetry_subscriptions
.entry(source_name.to_string())
.or_default()
.insert(subscription_id, Box::new(subscription));
if self.telemetry_subscriptions.get(source_name).unwrap().len() == 1 {
if let Some(source) = self.sources.get(source_name) {
*source.get_is_polling().write().unwrap() = true;
source.get_wakeup_flag().notify_one();
}
}
let subscriptions = self.telemetry_subscriptions.get(source_name).unwrap();
self.update_polling_interval(source_name, &subscriptions);
Ok(TelemetrySourceSubscription {
source_name: source_name.to_string(),
subscription_id,
telemetry_receiver,
subscription_type,
telemetry_source_manager: self.clone(),
_phantom: PhantomData,
})
} else {
Err(anyhow::anyhow!("Source not found: {}", source_name))
}
}
/// Unsubscribes from telemetry source telemetries.
///
/// # Arguments
///
/// * `source_name` - The name of the telemetry source.
/// * `subscription_id` - The subscription ID to unsubscribe.
///
/// # Returns
///
/// A Result indicating success or failure.
pub fn unsubscribe_telemetries(&self, source_name: &str, subscription_id: usize) -> Result<()> {
if let Some(mut subscriptions) = self.telemetry_subscriptions.get_mut(source_name) {
subscriptions.remove(&subscription_id);
if subscriptions.is_empty() {
if let Some(source) = self.sources.get(source_name) {
*source.get_is_polling().write().unwrap() = false;
source.set_current_interval(source.get_default_update_interval());
}
} else {
self.update_polling_interval(source_name, &subscriptions);
}
Ok(())
} else {
Err(anyhow::anyhow!("Source not found: {}", source_name))
}
}
/// Updates the polling interval for an telemetry source based on its subscriptions.
///
/// # Arguments
///
/// * `source_name` - The name of the telemetry source.
/// * `subscriptions` - The current subscriptions for the telemetry source.
fn update_polling_interval(
&self,
source_name: &str,
subscriptions: &HashMap<SubscriptionId, Box<dyn Subscription>>,
) {
if let Some(source) = self.sources.get(source_name) {
let min_sampling_interval = subscriptions
.values()
.filter_map(|sub| match sub.get_subscription_type() {
SubscriptionType::Periodical(sampling_interval, _) => Some(*sampling_interval),
_ => None,
})
.min()
.unwrap_or_else(|| source.get_default_update_interval());
source.set_current_interval(min_sampling_interval);
let min_reporting_interval = subscriptions
.values()
.filter_map(|sub| match sub.get_subscription_type() {
SubscriptionType::Periodical(_, reporting_interval) => {
Some(*reporting_interval)
}
_ => None,
})
.min()
.unwrap_or_else(|| source.get_current_reporting_interval());
source.set_current_reporting_interval(min_reporting_interval);
}
}
/// Sends telemetry source telemetries to subscribers.
///
/// # Arguments
///
/// * `source_name` - The name of the telemetry source.
/// * `values` - The telemetry source telemetries to send.
///
/// # Returns
///
/// A Result indicating success or failure.
async fn send_telemetries<T: Clone + Send + Debug + PartialEq + 'static>(
&self,
source_name: &str,
values: Vec<(T, SystemTime)>,
) -> Result<()> {
if let Some(mut subscriptions) = self.telemetry_subscriptions.get_mut(source_name) {
for (_, subscription) in subscriptions.iter_mut() {
let typed_subscription = subscription
.as_any_mut()
.downcast_mut::<TypedSubscription<T>>()
.unwrap();
let telemetry_sender = typed_subscription.get_sender().clone();
let subscription_type = typed_subscription.get_subscription_type().clone();
let mut telemetries = Vec::new();
match subscription_type {
SubscriptionType::OnChange => {
for (value, timestamp) in &values {
let mut current_value =
typed_subscription.current_value.lock().unwrap();
if current_value.as_ref() != Some(value) {
let telemetry = TypedTelemetry {
source_name: source_name.to_string(),
telemetry_type: "ValueChanged".to_string(),
value: value.clone(),
timestamp: *timestamp,
};
telemetries.push(Box::new(telemetry) as Box<dyn Telemetry + Send>);
*current_value = Some(value.clone());
}
}
}
SubscriptionType::Periodical(_, reporting_interval) => {
let mut should_send = false;
for (value, timestamp) in &values {
let mut last_timestamp =
typed_subscription.last_telemetry_timestamp.lock().unwrap();
if let Some(last) = *last_timestamp {
match timestamp.duration_since(last) {
Ok(duration) if duration >= reporting_interval => {
should_send = true;
*last_timestamp = Some(*timestamp);
}
Ok(_) => {
// Continue accumulating samples
}
Err(_) => {
eprintln!("System time went backwards");
*last_timestamp = Some(*timestamp);
should_send = true;
}
}
} else {
*last_timestamp = Some(*timestamp);
should_send = true;
}
let telemetry = TypedTelemetry {
source_name: source_name.to_string(),
telemetry_type: "Periodical".to_string(),
value: value.clone(),
timestamp: *timestamp,
};
telemetries.push(Box::new(telemetry) as Box<dyn Telemetry + Send>);
}
if !should_send {
continue;
}
}
}
if !telemetries.is_empty() {
telemetry_sender
.send(telemetries)
.await
.map_err(|e| anyhow::anyhow!("Failed to send telemetry: {}", e))?;
}
}
}
Ok(())
}
/// Gets the current value of an telemetry source.
///
/// # Arguments
///
/// * `source_name` - The name of the telemetry source.
///
/// # Returns
///
/// A Result containing the current value and timestamp, or an error if the source is not found.
#[allow(dead_code)]
pub async fn get_value<T: Clone + Send + 'static>(
&self,
source_name: &str,
) -> Result<(T, SystemTime)> {
if let Some(source) = self.sources.get(source_name) {
if let Some(telemetry_source) = source.as_any().downcast_ref::<TelemetrySource<T>>() {
telemetry_source
.value_reader
.get_value()
.await
.map_err(|e| anyhow::anyhow!("Failed to get value: {}", e))
} else {
Err(anyhow::anyhow!("Failed to downcast source"))
}
} else {
Err(anyhow::anyhow!("Source not found: {}", source_name))
}
}
}
/// Telemetry Source Subscription.
pub struct TelemetrySourceSubscription<T: Clone + Send + PartialEq + 'static> {
source_name: String,
subscription_id: SubscriptionId,
/// MPSC receiver for subscriber to receive Telemetries.
pub telemetry_receiver: Receiver<Vec<Box<dyn Telemetry + Send>>>,
#[allow(dead_code)]
subscription_type: SubscriptionType,
telemetry_source_manager: Arc<TelemetrySourceManager>,
_phantom: PhantomData<T>,
}
impl<T: Clone + Send + PartialEq + 'static> Drop for TelemetrySourceSubscription<T> {
fn drop(&mut self) {
if let Err(e) = self
.telemetry_source_manager
.unsubscribe_telemetries(&self.source_name, self.subscription_id)
{
eprintln!("Error unsubscribing telemetries: {}", e);
}
}
}
// Implement Debug for TelemetrySourceSubscription
impl<T: Clone + Send + PartialEq + 'static> fmt::Debug for TelemetrySourceSubscription<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TelemetrySourceSubscription")
.field("source_name", &self.source_name)
.field("subscription_id", &self.subscription_id)
.field("subscription_type", &self.subscription_type)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
use rand::Rng;
use tokio::time;
use tokio::sync::Mutex;
struct MockAsyncReadValue {
last_read_time: std::sync::Mutex<SystemTime>,
current_value: std::sync::Mutex<f64>,
}
impl MockAsyncReadValue {
fn new() -> Self {
Self {
last_read_time: std::sync::Mutex::new(SystemTime::now()),
current_value: std::sync::Mutex::new(0.0),
}
}
}
#[async_trait::async_trait]
impl AsyncReadValue<f64> for MockAsyncReadValue {
async fn read_values(
&self,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Vec<(f64, SystemTime)>, std::io::Error> {
// Sleep interval is for mock purpose only
time::sleep(reporting_interval).await;
let now = SystemTime::now();
let mut values = Vec::new();
let mut last_time = {
let mut last_time = self.last_read_time.lock().unwrap();
let new_last_time = std::cmp::max(*last_time, now - reporting_interval);
*last_time = new_last_time;
new_last_time
};
while last_time < now {
values.push((rand::thread_rng().gen::<f64>() * 100.0, last_time));
last_time = last_time.checked_add(sampling_interval).unwrap_or(now);
}
{
let mut last_time_lock = self.last_read_time.lock().unwrap();
*last_time_lock = last_time;
}
Ok(values)
}
async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> {
let now = SystemTime::now();
let value = {
let mut value = self.current_value.lock().unwrap();
*value = rand::thread_rng().gen::<f64>() * 100.0;
*value
};
Ok((value, now))
}
}
#[tokio::test]
async fn test_telemetry_source_manager() {
let telemetry_source_manager = TelemetrySourceManager::new();
let start_time = SystemTime::now();
let mock_reader = Arc::new(MockAsyncReadValue::new());
// Add telemetry sources
let source1 = TelemetrySource::new(
"Source1".to_string(),
Some(vec![
("FRU".to_string(), "CPU".to_string()),
("ReadingType".to_string(), "Temperature".to_string()),
]),
mock_reader.clone(),
Duration::from_millis(10),
Duration::from_millis(1),
Duration::from_millis(50),
);
let _ = telemetry_source_manager.add_source::<f64>(source1).await;
let mock_reader_2 = Arc::new(MockAsyncReadValue::new());
let source2 = TelemetrySource::new(
"Source2".to_string(),
Some(vec![
("FRU".to_string(), "GPU".to_string()),
("ReadingType".to_string(), "Temperature".to_string()),
]),
mock_reader_2.clone(),
Duration::from_millis(20),
Duration::from_millis(2),
Duration::from_millis(100),
);
let _ = telemetry_source_manager.add_source::<f64>(source2).await;
// Query all telemetry sources
let all_sources = telemetry_source_manager
.query_sources_by_names(&[])
.unwrap();
assert_eq!(
all_sources.len(),
2,
"There should be 2 telemetry sources in the manager"
);
// Query by name
let source1 = telemetry_source_manager
.query_sources_by_names(&["Source1"])
.unwrap();
assert_eq!(source1.len(), 1);
assert_eq!(source1[0], "Source1");
// Query all secondary keys
let keys = telemetry_source_manager.query_all_secondary_keys();
assert!(keys.contains(&"FRU".to_string()));
assert!(keys.contains(&"ReadingType".to_string()));
// Query by secondary key
let cpu_sources = telemetry_source_manager
.query_sources_by_property(("FRU", "CPU"))
.unwrap();
assert_eq!(
cpu_sources.len(),
1,
"There should be 1 telemetry source with FRU 'CPU'"
);
// Verify that polling is not started
let source1 = telemetry_source_manager.sources.get("Source1").unwrap();
assert!(
!*source1.get_is_polling().read().unwrap(),
"Polling should not start before subscription"
);
// Subscribe to telemetries from a source
let mut handles: Vec<tokio::task::JoinHandle<()>> = Vec::new();
if let Ok(mut subscription) = telemetry_source_manager.subscribe_telemetries::<f64>(
"Source1",
SubscriptionType::Periodical(Duration::from_millis(10), Duration::from_millis(100)),
) {
assert!(
*source1.get_is_polling().read().unwrap(),
"Polling should start after subscription"
);
let handle = tokio::spawn(async move {
let mut telemetry_count = 0;
while telemetry_count < 5 {
if let Some(telemetries) = subscription.telemetry_receiver.recv().await {
telemetry_count += 1;
for telemetry in telemetries.into_iter() {
let typed_telemetry = telemetry
.as_any()
.downcast_ref::<TypedTelemetry<f64>>()
.unwrap();
assert!(
typed_telemetry.timestamp <= SystemTime::now(),
"Telemetry timestamp is in the future"
);
assert!(
typed_telemetry.timestamp >= start_time,
"Telemetry timestamp is before start time"
);
}
}
}
drop(subscription);
});
handles.push(handle);
}
// Wait for all telemetry handlers to complete
for handle in handles {
handle.await.unwrap();
}
assert!(
!*source1.get_is_polling().read().unwrap(),
"Polling should stop after all subscriptions dropped"
);
// Drop this first so you can call remove_source on it
drop(source1);
// Remove an telemetry source
let _ = telemetry_source_manager.remove_source("Source1").await;
let remaining_sources = telemetry_source_manager
.query_sources_by_names(&[])
.unwrap();
assert_eq!(
remaining_sources.len(),
1,
"There should be 1 telemetry source remaining"
);
assert_eq!(remaining_sources[0], "Source2");
// Test get_value method
let value_result = telemetry_source_manager.get_value::<f64>("Source2").await;
assert!(value_result.is_ok(), "get_value should succeed");
let (value, timestamp) = value_result.unwrap();
assert!(
value >= 0.0 && value <= 100.0,
"Value should be between 0 and 100"
);
assert!(
timestamp <= SystemTime::now(),
"Timestamp should not be in the future"
);
// Test get_value for non-existent source
let error_result = telemetry_source_manager
.get_value::<f64>("NonExistentSource")
.await;
assert!(
error_result.is_err(),
"get_value should fail for non-existent source"
);
println!("Done!");
}
struct MockAsyncReadValueF64 {
last_read_time: Mutex<SystemTime>,
current_value: Mutex<f64>,
}
impl MockAsyncReadValueF64 {
fn new() -> Self {
Self {
last_read_time: Mutex::new(SystemTime::now()),
current_value: Mutex::new(0.0),
}
}
}
#[async_trait::async_trait]
impl AsyncReadValue<f64> for MockAsyncReadValueF64 {
async fn read_values(
&self,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Vec<(f64, SystemTime)>, std::io::Error> {
time::sleep(reporting_interval).await;
let now = SystemTime::now();
let mut values = Vec::new();
let mut last_time = {
let mut last_time = self.last_read_time.lock().await;
let new_last_time = std::cmp::max(*last_time, now - reporting_interval);
*last_time = new_last_time;
new_last_time
};
while last_time < now {
let value = rand::thread_rng().gen::<f64>() * 100.0;
values.push((value, last_time));
last_time = last_time.checked_add(sampling_interval).unwrap_or(now);
}
{
let mut last_time_lock = self.last_read_time.lock().await;
*last_time_lock = last_time;
}
Ok(values)
}
async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> {
let now = SystemTime::now();
let value = *self.current_value.lock().await;
Ok((value, now))
}
}
/// Why not make MockAsyncReadValue a generic struct of 'T'?
/// because this will make those downcast_ref fail -- in the TelemetrySourceManager, it does not
/// have the knowledge of type 'T' for each TelemetrySource.
struct MockAsyncReadValueBytes {
last_read_time: Mutex<SystemTime>,
current_value: Mutex<Arc<[u8]>>,
}
impl MockAsyncReadValueBytes {
fn new() -> Self {
Self {
last_read_time: Mutex::new(SystemTime::now()),
current_value: Mutex::new(Arc::new([0u8; 10])),
}
}
}
#[async_trait::async_trait]
impl AsyncReadValue<Arc<[u8]>> for MockAsyncReadValueBytes {
async fn read_values(
&self,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Vec<(Arc<[u8]>, SystemTime)>, std::io::Error> {
time::sleep(reporting_interval).await;
let now = SystemTime::now();
let mut values = Vec::new();
let mut last_time = {
let mut last_time = self.last_read_time.lock().await;
let new_last_time = std::cmp::max(*last_time, now - reporting_interval);
*last_time = new_last_time;
new_last_time
};
while last_time < now {
let value: Arc<[u8]> = Arc::new(rand::thread_rng().gen::<[u8; 10]>());
values.push((value, last_time));
last_time = last_time.checked_add(sampling_interval).unwrap_or(now);
}
{
let mut last_time_lock = self.last_read_time.lock().await;
*last_time_lock = last_time;
}
Ok(values)
}
async fn get_value(&self) -> Result<(Arc<[u8]>, SystemTime), std::io::Error> {
let now = SystemTime::now();
let value = self.current_value.lock().await.clone();
Ok((value, now))
}
}
#[tokio::test]
async fn test_telemetry_source_manager_with_different_types() {
let telemetry_source_manager = TelemetrySourceManager::new();
// Test with f64
let mock_reader_f64 = Arc::new(MockAsyncReadValueF64::new());
let source_f64 = TelemetrySource::new(
"SourceF64".to_string(),
None,
mock_reader_f64.clone(),
Duration::from_millis(10),
Duration::from_millis(1),
Duration::from_millis(50),
);
let _ = telemetry_source_manager.add_source::<f64>(source_f64).await;
// Test with Arc<[u8]>
let mock_reader_bytes = Arc::new(MockAsyncReadValueBytes::new());
let source_bytes = TelemetrySource::new(
"SourceBytes".to_string(),
None,
mock_reader_bytes.clone(),
Duration::from_millis(20),
Duration::from_millis(2),
Duration::from_millis(100),
);
let _ = telemetry_source_manager
.add_source::<Arc<[u8]>>(source_bytes)
.await;
// Test get_value for f64
let result_f64 = telemetry_source_manager.get_value::<f64>("SourceF64").await;
assert!(result_f64.is_ok(), "get_value should succeed for f64");
let (value_f64, timestamp_f64) = result_f64.unwrap();
assert!(
value_f64 >= 0.0 && value_f64 <= 100.0,
"f64 value should be between 0 and 100"
);
assert!(
timestamp_f64 <= SystemTime::now(),
"f64 timestamp should not be in the future"
);
// Test get_value for Arc<[u8]>
let result_bytes = telemetry_source_manager
.get_value::<Arc<[u8]>>("SourceBytes")
.await;
assert!(
result_bytes.is_ok(),
"get_value should succeed for Arc<[u8]>"
);
let (value_bytes, timestamp_bytes) = result_bytes.unwrap();
assert_eq!(value_bytes.len(), 10, "Byte array should have length 10");
assert!(
timestamp_bytes <= SystemTime::now(),
"Bytes timestamp should not be in the future"
);
// Test subscribing to telemetries for both types
let mut subscription_f64 = telemetry_source_manager
.subscribe_telemetries::<f64>("SourceF64", SubscriptionType::OnChange)
.unwrap();
let mut subscription_bytes = telemetry_source_manager
.subscribe_telemetries::<Arc<[u8]>>("SourceBytes", SubscriptionType::OnChange)
.unwrap();
// Receive and verify telemetries for f64
if let Some(telemetries) = tokio::time::timeout(
Duration::from_secs(6),
subscription_f64.telemetry_receiver.recv(),
)
.await
.unwrap()
{
for telemetry in telemetries {
let typed_telemetry = telemetry
.as_any()
.downcast_ref::<TypedTelemetry<f64>>()
.unwrap();
assert!(
typed_telemetry.value >= 0.0 && typed_telemetry.value <= 100.0,
"f64 telemetry value should be between 0 and 100"
);
}
}
// Receive and verify telemetries for Arc<[u8]>
if let Some(telemetries) = tokio::time::timeout(
Duration::from_secs(6),
subscription_bytes.telemetry_receiver.recv(),
)
.await
.unwrap()
{
for telemetry in telemetries {
let typed_telemetry = telemetry
.as_any()
.downcast_ref::<TypedTelemetry<Arc<[u8]>>>()
.unwrap();
assert_eq!(
typed_telemetry.value.len(),
10,
"Byte array in telemetry should have length 10"
);
}
}
// Test get_value for non-existent source
let error_result = telemetry_source_manager
.get_value::<f64>("NonExistentSource")
.await;
assert!(
error_result.is_err(),
"get_value should fail for non-existent source"
);
}
struct MockAsyncReadSubParam {
sampling_interval: Arc<AtomicUsize>,
reporting_interval: Arc<AtomicUsize>,
}
impl MockAsyncReadSubParam {
fn new() -> Self {
Self {
sampling_interval: Arc::new(AtomicUsize::new(0)),
reporting_interval: Arc::new(AtomicUsize::new(0)),
}
}
}
#[async_trait::async_trait]
impl AsyncReadValue<f64> for MockAsyncReadSubParam {
async fn read_values(
&self,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Vec<(f64, SystemTime)>, std::io::Error> {
self.sampling_interval
.store(sampling_interval.as_millis() as usize, Ordering::SeqCst);
self.reporting_interval
.store(reporting_interval.as_millis() as usize, Ordering::SeqCst);
let start = tokio::time::Instant::now();
let mut values = Vec::new();
while start.elapsed() < reporting_interval {
values.push((0.0, SystemTime::now()));
tokio::time::sleep(sampling_interval).await;
}
Ok(values)
}
async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> {
Ok((0.0, SystemTime::now()))
}
}
#[tokio::test]
async fn test_single_subscription_intervals() {
let telemetry_source_manager = TelemetrySourceManager::new();
let mock_reader = Arc::new(MockAsyncReadSubParam::new());
let source = TelemetrySource::new(
"TestSource".to_string(),
None,
mock_reader.clone(),
Duration::from_secs(1),
Duration::from_millis(10),
Duration::from_secs(10),
);
telemetry_source_manager
.add_source::<f64>(source)
.await
.unwrap();
let sampling_interval = Duration::from_millis(20);
let reporting_interval = Duration::from_millis(100);
let mut subscription = telemetry_source_manager
.subscribe_telemetries::<f64>(
"TestSource",
SubscriptionType::Periodical(sampling_interval, reporting_interval),
)
.unwrap();
// Wait for the first batch of telemetry to be received
let timeout_duration = Duration::from_millis(150);
match tokio::time::timeout(timeout_duration, subscription.telemetry_receiver.recv()).await {
Ok(Some(telemetries)) => {
// Check the intervals
assert_eq!(
mock_reader.sampling_interval.load(Ordering::SeqCst),
sampling_interval.as_millis() as usize,
"Sampling interval should match the subscription"
);
assert_eq!(
mock_reader.reporting_interval.load(Ordering::SeqCst),
reporting_interval.as_millis() as usize,
"Reporting interval should match the subscription"
);
// Check batch reporting
let expected_count = reporting_interval.as_millis() / sampling_interval.as_millis();
let acceptable_range = (expected_count - 1)..=(expected_count + 1);
assert!(
acceptable_range.contains(&(telemetries.len() as u128)),
"Number of telemetries ({}) should be approximately equal to reporting_interval/sampling_interval ({})",
telemetries.len(),
expected_count
);
}
Ok(None) => panic!("Channel closed unexpectedly"),
Err(_) => panic!("Timeout waiting for telemetry"),
}
}
#[tokio::test]
async fn test_multiple_subscriptions_and_unsubscribe() {
let telemetry_source_manager = TelemetrySourceManager::new();
let mock_reader = Arc::new(MockAsyncReadSubParam::new());
let source = TelemetrySource::new(
"TestSource".to_string(),
None,
mock_reader.clone(),
Duration::from_secs(1),
Duration::from_millis(10),
Duration::from_secs(10),
);
telemetry_source_manager
.add_source::<f64>(source)
.await
.unwrap();
let sampling_interval1 = Duration::from_millis(20);
let reporting_interval1 = Duration::from_millis(80);
let mut subscription1 = telemetry_source_manager
.subscribe_telemetries::<f64>(
"TestSource",
SubscriptionType::Periodical(sampling_interval1, reporting_interval1),
)
.unwrap();
let sampling_interval2 = Duration::from_millis(30);
let reporting_interval2 = Duration::from_millis(60);
let mut subscription2 = telemetry_source_manager
.subscribe_telemetries::<f64>(
"TestSource",
SubscriptionType::Periodical(sampling_interval2, reporting_interval2),
)
.unwrap();
// Wait for the first telemetry from either subscription
let timeout_duration = Duration::from_millis(100);
match tokio::time::timeout(timeout_duration, async {
tokio::select! {
_ = subscription1.telemetry_receiver.recv() => {},
_ = subscription2.telemetry_receiver.recv() => {},
}
})
.await
{
Ok(_) => {
assert_eq!(
mock_reader.sampling_interval.load(Ordering::SeqCst),
sampling_interval1.as_millis() as usize,
"Sampling interval should be the minimum of the two subscriptions"
);
assert_eq!(
mock_reader.reporting_interval.load(Ordering::SeqCst),
reporting_interval2.as_millis() as usize,
"Reporting interval should be the minimum of the two subscriptions"
);
}
Err(_) => panic!("Timeout waiting for telemetry"),
}
// Unsubscribe the first subscription
drop(subscription1);
// Wait a bit to ensure the unsubscribe takes effect
tokio::time::sleep(Duration::from_millis(100)).await;
// Wait for the next telemetry from the remaining subscription
match tokio::time::timeout(timeout_duration, subscription2.telemetry_receiver.recv()).await
{
Ok(Some(_)) => {
assert_eq!(
mock_reader.sampling_interval.load(Ordering::SeqCst),
sampling_interval2.as_millis() as usize,
"Sampling interval should now match the remaining subscription"
);
assert_eq!(
mock_reader.reporting_interval.load(Ordering::SeqCst),
reporting_interval2.as_millis() as usize,
"Reporting interval should now match the remaining subscription"
);
}
Ok(None) => panic!("Channel closed unexpectedly"),
Err(_) => panic!("Timeout waiting for telemetry after unsubscribe"),
}
// Unsubscribe the second subscription
drop(subscription2);
// Wait a bit to ensure the unsubscribe takes effect
tokio::time::sleep(Duration::from_millis(100)).await;
// Trigger a new read by creating a temporary subscription
let mut temp_subscription = telemetry_source_manager
.subscribe_telemetries::<f64>(
"TestSource",
SubscriptionType::Periodical(Duration::from_millis(1), Duration::from_millis(10)),
)
.unwrap();
match tokio::time::timeout(
timeout_duration,
temp_subscription.telemetry_receiver.recv(),
)
.await
{
Ok(Some(_)) => {
assert_eq!(
mock_reader.sampling_interval.load(Ordering::SeqCst),
Duration::from_millis(10).as_millis() as usize,
"Sampling interval should not lower than minimum_update_interval"
);
}
Ok(None) => panic!("Channel closed unexpectedly"),
Err(_) => panic!("Timeout waiting for telemetry after all unsubscribes"),
}
}
}