| // Copyright 2025 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #![doc = include_str!("README.md")] |
| |
| use anyhow::Result; |
| use dashmap::DashMap; |
| use std::any::Any; |
| use std::collections::HashMap; |
| use std::fmt; |
| use std::fmt::Debug; |
| use std::marker::PhantomData; |
| use std::sync::Mutex; |
| use std::sync::{ |
| atomic::{AtomicBool, AtomicUsize, Ordering}, |
| Arc, |
| }; |
| use std::time::{Duration, SystemTime}; |
| use tokio::sync::mpsc::{Receiver, Sender}; |
| use tokio::sync::{mpsc, Notify, RwLock}; |
| |
| /// A trait for asynchronous value reading. |
| #[async_trait::async_trait] |
| pub trait AsyncReadValue<T: Clone + Send + 'static>: Send + Sync { |
| /// Read values and return them with timestamps. |
| /// |
| /// # Arguments |
| /// |
| /// * `sampling_interval` - The interval between each sample. |
| /// * `reporting_interval` - The interval for reporting batched values. |
| /// |
| /// # Returns |
| /// |
| /// A vector of values with their corresponding timestamps. |
| async fn read_values( |
| &self, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Vec<(T, SystemTime)>, std::io::Error>; |
| |
| /// Read current value and return it with timestamp. |
| /// |
| /// # Returns |
| /// |
| /// A value with its corresponding timestamp. |
| #[allow(dead_code)] |
| async fn get_value(&self) -> Result<(T, SystemTime), std::io::Error>; |
| } |
| |
| /// Telemetry Source Data Model. |
| pub struct TelemetrySource<T: Clone + Send + 'static> { |
| /// Primary key. |
| name: String, |
| /// Secondary keys, like Some(vec![("FRU", "CPU"), ("ReadingType", "Temperature")]). |
| secondary_keys: Option<Vec<(String, String)>>, |
| /// Client provided reader to get telemetry source value update. |
| value_reader: Arc<dyn AsyncReadValue<T>>, |
| /// Default update interval for polling telemetry source value. |
| default_update_interval: Duration, |
| /// Lowest limit of update interval for telemetry source HW. |
| minimum_update_interval: Duration, |
| /// Flag to stop background telemetry source HW polling task. |
| stop_flag: Arc<AtomicBool>, |
| /// Current update interval for polling telemetry source value. |
| current_update_interval: AtomicUsize, // Store in milliseconds |
| /// Current reporting interval for batch reporting. |
| current_reporting_interval: AtomicUsize, // Store in milliseconds |
| /// Flag to wake up the polling task. |
| wakeup_flag: Arc<Notify>, |
| /// Flag to indicate if the telemetry source is currently polling. |
| is_polling: Arc<std::sync::RwLock<bool>>, |
| } |
| |
| impl<T: Clone + Send + 'static> TelemetrySource<T> { |
| /// Creates a new `TelemetrySource`. |
| /// |
| /// # Arguments |
| /// |
| /// * `name` - A string that holds the primary key for the telemetry source. |
| /// * `secondary_keys` - An optional vector of key-value pairs representing additional keys. |
| /// * `value_reader` - An `Arc` containing a trait object that implements `AsyncReadValue`. |
| /// * `default_update_interval` - The default duration between value updates. |
| /// * `minimum_update_interval` - The minimum allowable duration between value updates. |
| /// * `reporting_interval` - The interval of batch reporting of values. |
| /// |
| /// # Returns |
| /// |
| /// A new `Box<TelemetrySource>` instance. |
| pub fn new( |
| name: String, |
| secondary_keys: Option<Vec<(String, String)>>, |
| value_reader: Arc<dyn AsyncReadValue<T>>, |
| default_update_interval: Duration, |
| minimum_update_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Box<Self> { |
| Box::new(Self { |
| name, |
| secondary_keys, |
| value_reader, |
| default_update_interval, |
| minimum_update_interval, |
| stop_flag: Arc::new(AtomicBool::new(false)), |
| current_update_interval: AtomicUsize::new(default_update_interval.as_millis() as usize), |
| current_reporting_interval: AtomicUsize::new(reporting_interval.as_millis() as usize), |
| wakeup_flag: Arc::new(Notify::new()), |
| is_polling: Arc::new(std::sync::RwLock::new(false)), |
| }) |
| } |
| } |
| |
| impl<T: Clone + Send + 'static> fmt::Debug for TelemetrySource<T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("TelemetrySource") |
| .field("name", &self.name) |
| .field("secondary_keys", &self.secondary_keys) |
| .field("default_update_interval", &self.default_update_interval) |
| .field("minimum_update_interval", &self.minimum_update_interval) |
| .field("current_update_interval", &self.get_current_interval()) |
| .field( |
| "current_reporting_interval", |
| &self.get_current_reporting_interval(), |
| ) |
| .finish() |
| } |
| } |
| |
| /// Define the TelemetrySourceTrait for type erase purpose, allowing different types |
| /// of telemetry sources to be stored and managed uniformly in the TelemetrySourceManager |
| pub trait TelemetrySourceTrait: Send + Sync { |
| /// Gets the name of the telemetry source. |
| fn get_name(&self) -> &str; |
| |
| /// Gets the secondary keys of the telemetry source. |
| fn get_secondary_keys(&self) -> Option<&Vec<(String, String)>>; |
| |
| /// Sets the stop flag for the telemetry source. |
| #[allow(dead_code)] |
| fn set_stop_flag(&self, value: bool); |
| |
| /// Gets the current polling interval. |
| fn get_current_interval(&self) -> Duration; |
| |
| /// Sets the current polling interval. |
| fn set_current_interval(&self, new_interval: Duration); |
| |
| /// Gets the current reporting interval. |
| fn get_current_reporting_interval(&self) -> Duration; |
| |
| /// Sets the current reporting interval. |
| fn set_current_reporting_interval(&self, new_interval: Duration); |
| |
| /// Gets the default update interval. |
| fn get_default_update_interval(&self) -> Duration; |
| |
| /// Gets the wakeup flag for the telemetry source. |
| fn get_wakeup_flag(&self) -> Arc<Notify>; |
| |
| /// Gets the is_polling flag for the telemetry source. |
| fn get_is_polling(&self) -> Arc<std::sync::RwLock<bool>>; |
| |
| /// Converts the trait object to Any. |
| fn as_any(&self) -> &dyn Any; |
| } |
| |
| impl<T: Clone + Send + 'static> TelemetrySourceTrait for TelemetrySource<T> { |
| fn get_name(&self) -> &str { |
| &self.name |
| } |
| |
| fn get_secondary_keys(&self) -> Option<&Vec<(String, String)>> { |
| self.secondary_keys.as_ref() |
| } |
| |
| fn set_current_interval(&self, new_interval: Duration) { |
| let new_interval_ms = std::cmp::max( |
| new_interval.as_millis(), |
| self.minimum_update_interval.as_millis(), |
| ); |
| let new_interval_ms = |
| std::cmp::min(new_interval_ms, self.default_update_interval.as_millis()) as usize; |
| self.current_update_interval |
| .store(new_interval_ms, Ordering::SeqCst); |
| if self.current_reporting_interval.load(Ordering::SeqCst) < new_interval_ms { |
| self.set_current_reporting_interval(Duration::from_millis(new_interval_ms as u64)); |
| } |
| } |
| |
| fn set_current_reporting_interval(&self, new_interval: Duration) { |
| let new_interval_ms = std::cmp::max( |
| new_interval.as_millis() as usize, |
| self.current_update_interval.load(Ordering::SeqCst), |
| ); |
| self.current_reporting_interval |
| .store(new_interval_ms, Ordering::SeqCst); |
| } |
| |
| fn get_current_interval(&self) -> Duration { |
| Duration::from_millis(self.current_update_interval.load(Ordering::SeqCst) as u64) |
| } |
| |
| fn get_current_reporting_interval(&self) -> Duration { |
| Duration::from_millis(self.current_reporting_interval.load(Ordering::SeqCst) as u64) |
| } |
| |
| fn get_default_update_interval(&self) -> Duration { |
| self.default_update_interval |
| } |
| |
| fn set_stop_flag(&self, value: bool) { |
| self.stop_flag.store(value, Ordering::SeqCst); |
| } |
| |
| fn get_wakeup_flag(&self) -> Arc<Notify> { |
| self.wakeup_flag.clone() |
| } |
| |
| fn get_is_polling(&self) -> Arc<std::sync::RwLock<bool>> { |
| self.is_polling.clone() |
| } |
| |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| } |
| |
| /// Telemetry struct representing a value change or periodic update. |
| #[derive(Clone, Debug, PartialEq)] |
| pub struct TypedTelemetry<T: Clone + Send + 'static> { |
| /// Telemetry source name associated with the telemetry. |
| pub source_name: String, |
| /// Telemetry type as defined in Redfish spec for the Telemetry. |
| pub telemetry_type: String, |
| /// Value associated with the telemetry. |
| pub value: T, |
| /// Sampling timestamp. |
| pub timestamp: SystemTime, |
| } |
| |
| /// This trait allows for type-agnostic telemetry handling, enabling the system to work |
| /// with different types of telemetries without knowing their specific types. |
| pub trait Telemetry: Send + Debug { |
| /// Gets the source name of the telemetry. |
| #[allow(dead_code)] |
| fn source_name(&self) -> &str; |
| |
| /// Gets the telemetry type. |
| #[allow(dead_code)] |
| fn telemetry_type(&self) -> &str; |
| |
| /// Gets the timestamp of the telemetry. |
| #[allow(dead_code)] |
| fn timestamp(&self) -> SystemTime; |
| |
| /// Converts the trait object to Any. |
| fn as_any(&self) -> &dyn Any; |
| } |
| |
| impl<T: Clone + Send + Debug + 'static> Telemetry for TypedTelemetry<T> { |
| fn source_name(&self) -> &str { |
| &self.source_name |
| } |
| |
| fn telemetry_type(&self) -> &str { |
| &self.telemetry_type |
| } |
| |
| fn timestamp(&self) -> SystemTime { |
| self.timestamp |
| } |
| |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| } |
| |
| pub type SampleInterval = Duration; |
| pub type ReportInterval = Duration; |
| |
| /// Subscription Type Enum. |
| #[derive(Clone, Debug, PartialEq)] |
| pub enum SubscriptionType { |
| /// Whenever value changed, send an Telemetry. |
| OnChange, |
| /// Sampling interval, when it's shorter than the telemetry source's update_interval, the Telemetry |
| /// will be sent at update_interval (but never shorter than the minimum_update_interval). |
| /// Reporting interval, which should be no shorter than the sampling interval |
| Periodical(SampleInterval, ReportInterval), |
| } |
| |
| /// Type alias for telemetry source subscription ID. |
| type SubscriptionId = usize; |
| /// Type alias for telemetry sender. |
| type TelemetrySender = Sender<Vec<Box<dyn Telemetry + Send>>>; |
| |
| /// Represents a subscription to a telemetry source. |
| /// |
| /// This trait defines the interface for managing subscriptions to telemetry data. |
| /// Implementations of this trait should be able to handle different types of |
| /// telemetry data and subscription models. |
| pub trait Subscription: Send + Sync { |
| /// Returns a reference to the telemetry sender associated with this subscription. |
| /// |
| /// # Returns |
| /// |
| /// A reference to the `TelemetrySender` used to send telemetry data. |
| fn get_sender(&self) -> &TelemetrySender; |
| |
| /// Returns the type of this subscription. |
| /// |
| /// # Returns |
| /// |
| /// A reference to the `SubscriptionType` indicating whether this is an OnChange |
| /// or Periodical subscription. |
| fn get_subscription_type(&self) -> &SubscriptionType; |
| |
| /// Retrieves the timestamp of the last telemetry update. |
| /// |
| /// # Returns |
| /// |
| /// An `Option<SystemTime>` representing the timestamp of the last telemetry update, |
| /// or `None` if no update has occurred yet. |
| #[allow(dead_code)] |
| fn get_last_telemetry_timestamp(&self) -> Option<SystemTime>; |
| |
| /// Sets the timestamp of the last telemetry update. |
| /// |
| /// # Arguments |
| /// |
| /// * `timestamp` - A `SystemTime` representing the new last update timestamp. |
| #[allow(dead_code)] |
| fn set_last_telemetry_timestamp(&self, timestamp: SystemTime); |
| |
| /// Retrieves the current value of the telemetry data. |
| /// |
| /// # Returns |
| /// |
| /// A `Box<dyn Any>` containing the current telemetry value. |
| #[allow(dead_code)] |
| fn get_current_value(&self) -> Box<dyn Any>; |
| |
| /// Sets the current value of the telemetry data. |
| /// |
| /// # Arguments |
| /// |
| /// * `value` - A `Box<dyn Any>` containing the new telemetry value. |
| #[allow(dead_code)] |
| fn set_current_value(&self, value: Box<dyn Any>); |
| |
| /// Converts this trait object to `&dyn Any`. |
| /// |
| /// This method is used for downcasting. |
| /// |
| /// # Returns |
| /// |
| /// A reference to `dyn Any`. |
| #[allow(dead_code)] |
| fn as_any(&self) -> &dyn Any; |
| |
| /// Converts this mutable trait object to `&mut dyn Any`. |
| /// |
| /// This method is used for mutable downcasting. |
| /// |
| /// # Returns |
| /// |
| /// A mutable reference to `dyn Any`. |
| fn as_any_mut(&mut self) -> &mut dyn Any; |
| } |
| |
| /// A typed implementation of the `Subscription` trait. |
| /// |
| /// This struct provides a concrete implementation of `Subscription` for a specific |
| /// telemetry data type `T`. |
| /// |
| /// # Type Parameters |
| /// |
| /// * `T`: The type of telemetry data this subscription handles. It must be `Clone`, |
| /// `Send`, and have a `'static` lifetime. |
| pub struct TypedSubscription<T: Clone + Send + 'static> { |
| /// The sender used to transmit telemetry data. |
| sender: TelemetrySender, |
| |
| /// The type of this subscription (OnChange or Periodical). |
| subscription_type: SubscriptionType, |
| |
| /// The timestamp of the last telemetry update, wrapped in a mutex for thread-safe access. |
| last_telemetry_timestamp: Mutex<Option<SystemTime>>, |
| |
| /// The current value of the telemetry data, wrapped in a mutex for thread-safe access. |
| current_value: Mutex<Option<T>>, |
| } |
| |
| impl<T: Clone + Send + 'static> Subscription for TypedSubscription<T> { |
| fn get_sender(&self) -> &TelemetrySender { |
| &self.sender |
| } |
| |
| fn get_subscription_type(&self) -> &SubscriptionType { |
| &self.subscription_type |
| } |
| |
| fn get_last_telemetry_timestamp(&self) -> Option<SystemTime> { |
| *self.last_telemetry_timestamp.lock().unwrap() |
| } |
| |
| fn set_last_telemetry_timestamp(&self, timestamp: SystemTime) { |
| *self.last_telemetry_timestamp.lock().unwrap() = Some(timestamp); |
| } |
| |
| fn get_current_value(&self) -> Box<dyn Any> { |
| Box::new(self.current_value.lock().unwrap().clone()) |
| } |
| |
| fn set_current_value(&self, value: Box<dyn Any>) { |
| if let Ok(typed_value) = value.downcast::<T>() { |
| *self.current_value.lock().unwrap() = Some(*typed_value); |
| } |
| } |
| |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| fn as_any_mut(&mut self) -> &mut dyn Any { |
| self |
| } |
| } |
| |
| pub struct TelemetrySourceManager { |
| /// Object store with primary key. |
| sources: DashMap<String, Arc<Box<dyn TelemetrySourceTrait>>>, |
| /// Secondary key maps, as <key_name, HashMap<key_value, Vec<primary_keys>>. |
| secondary_map: DashMap<String, HashMap<String, Vec<String>>>, |
| /// Subscription map as <telemetry source name, Subscriptions>. |
| telemetry_subscriptions: DashMap<String, HashMap<SubscriptionId, Box<dyn Subscription>>>, |
| /// Atomic variable used to generate next subscription_id. |
| next_subscription_id: AtomicUsize, |
| /// The lock is only needed for add/remove telemetry sources. |
| lock: RwLock<()>, |
| } |
| |
| impl TelemetrySourceManager { |
| /// Creates a new `TelemetrySourceManager`. |
| /// |
| /// # Returns |
| /// |
| /// A new `Arc<TelemetrySourceManager>` instance. |
| pub fn new() -> Arc<Self> { |
| Arc::new(Self { |
| sources: DashMap::new(), |
| secondary_map: DashMap::new(), |
| telemetry_subscriptions: DashMap::new(), |
| next_subscription_id: AtomicUsize::new(0), |
| lock: RwLock::new(()), |
| }) |
| } |
| |
| /// Adds an telemetry source to the manager. |
| /// |
| /// # Arguments |
| /// |
| /// * `source` - The telemetry source to be added. |
| /// |
| /// # Returns |
| /// |
| /// A Result indicating success or failure. |
| pub async fn add_source<T: Clone + Send + PartialEq + Debug + 'static>( |
| self: &Arc<Self>, |
| source: Box<dyn TelemetrySourceTrait>, |
| ) -> Result<()> { |
| let _write_guard = self.lock.write().await; |
| let source_name = source.get_name().to_string(); |
| if let Some(secondary_keys) = source.get_secondary_keys() { |
| for (key_name, key_value) in secondary_keys { |
| self.secondary_map |
| .entry(key_name.clone()) |
| .or_default() |
| .entry(key_value.clone()) |
| .or_default() |
| .push(source_name.clone()); |
| } |
| } |
| let source = Arc::new(source); |
| self.sources.insert(source_name.clone(), source.clone()); |
| let manager = self.clone(); |
| |
| tokio::spawn(async move { |
| loop { |
| if source |
| .as_any() |
| .downcast_ref::<TelemetrySource<T>>() |
| .expect("Failed to downcast source") |
| .stop_flag |
| .load(Ordering::SeqCst) |
| { |
| break; |
| } |
| |
| source.get_wakeup_flag().notified().await; |
| |
| if !*source.get_is_polling().read().unwrap() { |
| continue; |
| } |
| |
| loop { |
| if source |
| .as_any() |
| .downcast_ref::<TelemetrySource<T>>() |
| .expect("Failed to downcast source") |
| .stop_flag |
| .load(Ordering::SeqCst) |
| { |
| return; |
| } |
| |
| let update_interval = source.get_current_interval(); |
| let reporting_interval = source.get_current_reporting_interval(); |
| |
| if !*source.get_is_polling().read().unwrap() { |
| break; |
| } |
| |
| match source |
| .as_any() |
| .downcast_ref::<TelemetrySource<T>>() |
| .expect("Failed to downcast source") |
| .value_reader |
| .read_values(update_interval, reporting_interval) |
| .await |
| { |
| Ok(values) => { |
| if !values.is_empty() { |
| if let Err(e) = manager.send_telemetries(&source_name, values).await |
| { |
| eprintln!("Error sending telemetries: {}", e); |
| } |
| } |
| } |
| Err(e) => eprintln!("Error reading values: {}", e), |
| } |
| } |
| } |
| }); |
| |
| Ok(()) |
| } |
| |
| /// Removes an telemetry source from the manager. |
| /// |
| /// CAUTION! All reference got from those query interfaces to this source must be dropped before making this call, |
| /// otherwise this call will deadlock. |
| /// |
| /// # Arguments |
| /// |
| /// * `source_name` - The name of the telemetry source to remove. |
| /// |
| /// # Returns |
| /// |
| /// A Result indicating success or failure. |
| #[allow(dead_code)] |
| pub async fn remove_source(self: &Arc<Self>, source_name: &str) -> Result<()> { |
| if let Some((_, source)) = self.sources.remove(source_name) { |
| source.set_stop_flag(true); |
| *source.get_is_polling().write().unwrap() = false; |
| |
| if let Some(secondary_keys) = source.get_secondary_keys() { |
| for (key_name, key_value) in secondary_keys { |
| if let Some(mut key_map) = self.secondary_map.get_mut(key_name) { |
| key_map.retain(|k, v| { |
| if k == key_value { |
| v.retain(|name| name != source_name); |
| !v.is_empty() |
| } else { |
| true |
| } |
| }); |
| } |
| } |
| } |
| self.telemetry_subscriptions.remove(source_name); |
| Ok(()) |
| } else { |
| Err(anyhow::anyhow!("Source not found: {}", source_name)) |
| } |
| } |
| |
| /// Returns a list of all secondary keys. |
| /// |
| /// # Returns |
| /// |
| /// A vector of strings representing all secondary keys. |
| #[allow(dead_code)] |
| pub fn query_all_secondary_keys(&self) -> Vec<String> { |
| self.secondary_map |
| .iter() |
| .map(|entry| entry.key().clone()) |
| .collect() |
| } |
| |
| /// Queries telemetry sources by primary keys. |
| /// |
| /// # Arguments |
| /// |
| /// * `primary_keys` - A slice of primary key strings. |
| /// |
| /// # Returns |
| /// |
| /// A Result containing a vector of telemetry sources matching the primary keys, |
| /// or an error if no sources are found. |
| pub fn query_sources_by_names(&self, primary_keys: &[&str]) -> Result<Vec<String>> { |
| let result: Vec<_> = if primary_keys.is_empty() { |
| self.sources |
| .iter() |
| .map(|entry| entry.key().clone()) |
| .collect() |
| } else { |
| primary_keys |
| .iter() |
| .filter_map(|&key| { |
| if self.sources.contains_key(key) { |
| Some(key.to_string()) |
| } else { |
| None |
| } |
| }) |
| .collect() |
| }; |
| if result.is_empty() { |
| Err(anyhow::anyhow!("No sources found")) |
| } else { |
| Ok(result) |
| } |
| } |
| |
| /// Queries telemetry sources by a secondary key property. |
| /// |
| /// # Arguments |
| /// |
| /// * `key` - A tuple containing the secondary key name and value. |
| /// |
| /// # Returns |
| /// |
| /// A Result containing a vector of telemetry sources matching the secondary key property, |
| /// or an error if the secondary key is not found or no sources match. |
| #[allow(dead_code)] |
| pub fn query_sources_by_property(&self, key: (&str, &str)) -> Result<Vec<String>> { |
| self.secondary_map |
| .get(key.0) |
| .and_then(|map| map.get(key.1).cloned()) |
| .ok_or_else(|| anyhow::anyhow!("Secondary key not found: {}", key.0)) |
| .and_then(|sources| { |
| if sources.is_empty() { |
| Err(anyhow::anyhow!("No sources found for the given property")) |
| } else { |
| Ok(sources) |
| } |
| }) |
| } |
| |
| /// Subscribes to telemetry source telemetries. |
| /// |
| /// CAUTION! The returned TelemetrySourceSubscription need to be moved to a separate task to |
| /// run, otherwise drop it may cause deadlock. |
| /// |
| /// # Arguments |
| /// |
| /// * `source_name` - The name of the telemetry source to subscribe to. |
| /// * `subscription_type` - The type of subscription (OnChange, Periodical). |
| /// |
| /// # Returns |
| /// |
| /// A Result containing an TelemetrySourceSubscription, or an error if the source is not found. |
| pub fn subscribe_telemetries<T: Clone + Send + PartialEq + 'static>( |
| self: &Arc<Self>, |
| source_name: &str, |
| subscription_type: SubscriptionType, |
| ) -> Result<TelemetrySourceSubscription<T>> { |
| if self.sources.contains_key(source_name) { |
| let (telemetry_sender, telemetry_receiver) = mpsc::channel(100); |
| let subscription_id = self.next_subscription_id.fetch_add(1, Ordering::SeqCst); |
| |
| let subscription: TypedSubscription<T> = TypedSubscription { |
| sender: telemetry_sender.clone(), |
| subscription_type: subscription_type.clone(), |
| last_telemetry_timestamp: Mutex::new(None), |
| current_value: Mutex::new(None), |
| }; |
| |
| self.telemetry_subscriptions |
| .entry(source_name.to_string()) |
| .or_default() |
| .insert(subscription_id, Box::new(subscription)); |
| |
| if self.telemetry_subscriptions.get(source_name).unwrap().len() == 1 { |
| if let Some(source) = self.sources.get(source_name) { |
| *source.get_is_polling().write().unwrap() = true; |
| source.get_wakeup_flag().notify_one(); |
| } |
| } |
| |
| let subscriptions = self.telemetry_subscriptions.get(source_name).unwrap(); |
| self.update_polling_interval(source_name, &subscriptions); |
| |
| Ok(TelemetrySourceSubscription { |
| source_name: source_name.to_string(), |
| subscription_id, |
| telemetry_receiver, |
| subscription_type, |
| telemetry_source_manager: self.clone(), |
| _phantom: PhantomData, |
| }) |
| } else { |
| Err(anyhow::anyhow!("Source not found: {}", source_name)) |
| } |
| } |
| |
| /// Unsubscribes from telemetry source telemetries. |
| /// |
| /// # Arguments |
| /// |
| /// * `source_name` - The name of the telemetry source. |
| /// * `subscription_id` - The subscription ID to unsubscribe. |
| /// |
| /// # Returns |
| /// |
| /// A Result indicating success or failure. |
| pub fn unsubscribe_telemetries(&self, source_name: &str, subscription_id: usize) -> Result<()> { |
| if let Some(mut subscriptions) = self.telemetry_subscriptions.get_mut(source_name) { |
| subscriptions.remove(&subscription_id); |
| if subscriptions.is_empty() { |
| if let Some(source) = self.sources.get(source_name) { |
| *source.get_is_polling().write().unwrap() = false; |
| source.set_current_interval(source.get_default_update_interval()); |
| } |
| } else { |
| self.update_polling_interval(source_name, &subscriptions); |
| } |
| Ok(()) |
| } else { |
| Err(anyhow::anyhow!("Source not found: {}", source_name)) |
| } |
| } |
| |
| /// Updates the polling interval for an telemetry source based on its subscriptions. |
| /// |
| /// # Arguments |
| /// |
| /// * `source_name` - The name of the telemetry source. |
| /// * `subscriptions` - The current subscriptions for the telemetry source. |
| fn update_polling_interval( |
| &self, |
| source_name: &str, |
| subscriptions: &HashMap<SubscriptionId, Box<dyn Subscription>>, |
| ) { |
| if let Some(source) = self.sources.get(source_name) { |
| let min_sampling_interval = subscriptions |
| .values() |
| .filter_map(|sub| match sub.get_subscription_type() { |
| SubscriptionType::Periodical(sampling_interval, _) => Some(*sampling_interval), |
| _ => None, |
| }) |
| .min() |
| .unwrap_or_else(|| source.get_default_update_interval()); |
| |
| source.set_current_interval(min_sampling_interval); |
| |
| let min_reporting_interval = subscriptions |
| .values() |
| .filter_map(|sub| match sub.get_subscription_type() { |
| SubscriptionType::Periodical(_, reporting_interval) => { |
| Some(*reporting_interval) |
| } |
| _ => None, |
| }) |
| .min() |
| .unwrap_or_else(|| source.get_current_reporting_interval()); |
| |
| source.set_current_reporting_interval(min_reporting_interval); |
| } |
| } |
| |
| /// Sends telemetry source telemetries to subscribers. |
| /// |
| /// # Arguments |
| /// |
| /// * `source_name` - The name of the telemetry source. |
| /// * `values` - The telemetry source telemetries to send. |
| /// |
| /// # Returns |
| /// |
| /// A Result indicating success or failure. |
| async fn send_telemetries<T: Clone + Send + Debug + PartialEq + 'static>( |
| &self, |
| source_name: &str, |
| values: Vec<(T, SystemTime)>, |
| ) -> Result<()> { |
| if let Some(mut subscriptions) = self.telemetry_subscriptions.get_mut(source_name) { |
| for (_, subscription) in subscriptions.iter_mut() { |
| let typed_subscription = subscription |
| .as_any_mut() |
| .downcast_mut::<TypedSubscription<T>>() |
| .unwrap(); |
| let telemetry_sender = typed_subscription.get_sender().clone(); |
| let subscription_type = typed_subscription.get_subscription_type().clone(); |
| let mut telemetries = Vec::new(); |
| |
| match subscription_type { |
| SubscriptionType::OnChange => { |
| for (value, timestamp) in &values { |
| let mut current_value = |
| typed_subscription.current_value.lock().unwrap(); |
| if current_value.as_ref() != Some(value) { |
| let telemetry = TypedTelemetry { |
| source_name: source_name.to_string(), |
| telemetry_type: "ValueChanged".to_string(), |
| value: value.clone(), |
| timestamp: *timestamp, |
| }; |
| telemetries.push(Box::new(telemetry) as Box<dyn Telemetry + Send>); |
| *current_value = Some(value.clone()); |
| } |
| } |
| } |
| SubscriptionType::Periodical(_, reporting_interval) => { |
| let mut should_send = false; |
| for (value, timestamp) in &values { |
| let mut last_timestamp = |
| typed_subscription.last_telemetry_timestamp.lock().unwrap(); |
| if let Some(last) = *last_timestamp { |
| match timestamp.duration_since(last) { |
| Ok(duration) if duration >= reporting_interval => { |
| should_send = true; |
| *last_timestamp = Some(*timestamp); |
| } |
| Ok(_) => { |
| // Continue accumulating samples |
| } |
| Err(_) => { |
| eprintln!("System time went backwards"); |
| *last_timestamp = Some(*timestamp); |
| should_send = true; |
| } |
| } |
| } else { |
| *last_timestamp = Some(*timestamp); |
| should_send = true; |
| } |
| |
| let telemetry = TypedTelemetry { |
| source_name: source_name.to_string(), |
| telemetry_type: "Periodical".to_string(), |
| value: value.clone(), |
| timestamp: *timestamp, |
| }; |
| telemetries.push(Box::new(telemetry) as Box<dyn Telemetry + Send>); |
| } |
| |
| if !should_send { |
| continue; |
| } |
| } |
| } |
| |
| if !telemetries.is_empty() { |
| telemetry_sender |
| .send(telemetries) |
| .await |
| .map_err(|e| anyhow::anyhow!("Failed to send telemetry: {}", e))?; |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Gets the current value of an telemetry source. |
| /// |
| /// # Arguments |
| /// |
| /// * `source_name` - The name of the telemetry source. |
| /// |
| /// # Returns |
| /// |
| /// A Result containing the current value and timestamp, or an error if the source is not found. |
| #[allow(dead_code)] |
| pub async fn get_value<T: Clone + Send + 'static>( |
| &self, |
| source_name: &str, |
| ) -> Result<(T, SystemTime)> { |
| if let Some(source) = self.sources.get(source_name) { |
| if let Some(telemetry_source) = source.as_any().downcast_ref::<TelemetrySource<T>>() { |
| telemetry_source |
| .value_reader |
| .get_value() |
| .await |
| .map_err(|e| anyhow::anyhow!("Failed to get value: {}", e)) |
| } else { |
| Err(anyhow::anyhow!("Failed to downcast source")) |
| } |
| } else { |
| Err(anyhow::anyhow!("Source not found: {}", source_name)) |
| } |
| } |
| } |
| |
| /// Telemetry Source Subscription. |
| pub struct TelemetrySourceSubscription<T: Clone + Send + PartialEq + 'static> { |
| source_name: String, |
| subscription_id: SubscriptionId, |
| /// MPSC receiver for subscriber to receive Telemetries. |
| pub telemetry_receiver: Receiver<Vec<Box<dyn Telemetry + Send>>>, |
| #[allow(dead_code)] |
| subscription_type: SubscriptionType, |
| telemetry_source_manager: Arc<TelemetrySourceManager>, |
| _phantom: PhantomData<T>, |
| } |
| |
| impl<T: Clone + Send + PartialEq + 'static> Drop for TelemetrySourceSubscription<T> { |
| fn drop(&mut self) { |
| if let Err(e) = self |
| .telemetry_source_manager |
| .unsubscribe_telemetries(&self.source_name, self.subscription_id) |
| { |
| eprintln!("Error unsubscribing telemetries: {}", e); |
| } |
| } |
| } |
| |
| // Implement Debug for TelemetrySourceSubscription |
| impl<T: Clone + Send + PartialEq + 'static> fmt::Debug for TelemetrySourceSubscription<T> { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("TelemetrySourceSubscription") |
| .field("source_name", &self.source_name) |
| .field("subscription_id", &self.subscription_id) |
| .field("subscription_type", &self.subscription_type) |
| .finish() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use std::sync::atomic::Ordering; |
| |
| use rand::Rng; |
| use tokio::time; |
| |
| use tokio::sync::Mutex; |
| |
| struct MockAsyncReadValue { |
| last_read_time: std::sync::Mutex<SystemTime>, |
| current_value: std::sync::Mutex<f64>, |
| } |
| |
| impl MockAsyncReadValue { |
| fn new() -> Self { |
| Self { |
| last_read_time: std::sync::Mutex::new(SystemTime::now()), |
| current_value: std::sync::Mutex::new(0.0), |
| } |
| } |
| } |
| |
| #[async_trait::async_trait] |
| impl AsyncReadValue<f64> for MockAsyncReadValue { |
| async fn read_values( |
| &self, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Vec<(f64, SystemTime)>, std::io::Error> { |
| // Sleep interval is for mock purpose only |
| time::sleep(reporting_interval).await; |
| |
| let now = SystemTime::now(); |
| let mut values = Vec::new(); |
| |
| let mut last_time = { |
| let mut last_time = self.last_read_time.lock().unwrap(); |
| let new_last_time = std::cmp::max(*last_time, now - reporting_interval); |
| *last_time = new_last_time; |
| new_last_time |
| }; |
| |
| while last_time < now { |
| values.push((rand::thread_rng().gen::<f64>() * 100.0, last_time)); |
| last_time = last_time.checked_add(sampling_interval).unwrap_or(now); |
| } |
| |
| { |
| let mut last_time_lock = self.last_read_time.lock().unwrap(); |
| *last_time_lock = last_time; |
| } |
| |
| Ok(values) |
| } |
| |
| async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> { |
| let now = SystemTime::now(); |
| let value = { |
| let mut value = self.current_value.lock().unwrap(); |
| *value = rand::thread_rng().gen::<f64>() * 100.0; |
| *value |
| }; |
| Ok((value, now)) |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_telemetry_source_manager() { |
| let telemetry_source_manager = TelemetrySourceManager::new(); |
| let start_time = SystemTime::now(); |
| let mock_reader = Arc::new(MockAsyncReadValue::new()); |
| |
| // Add telemetry sources |
| let source1 = TelemetrySource::new( |
| "Source1".to_string(), |
| Some(vec![ |
| ("FRU".to_string(), "CPU".to_string()), |
| ("ReadingType".to_string(), "Temperature".to_string()), |
| ]), |
| mock_reader.clone(), |
| Duration::from_millis(10), |
| Duration::from_millis(1), |
| Duration::from_millis(50), |
| ); |
| let _ = telemetry_source_manager.add_source::<f64>(source1).await; |
| |
| let mock_reader_2 = Arc::new(MockAsyncReadValue::new()); |
| let source2 = TelemetrySource::new( |
| "Source2".to_string(), |
| Some(vec![ |
| ("FRU".to_string(), "GPU".to_string()), |
| ("ReadingType".to_string(), "Temperature".to_string()), |
| ]), |
| mock_reader_2.clone(), |
| Duration::from_millis(20), |
| Duration::from_millis(2), |
| Duration::from_millis(100), |
| ); |
| let _ = telemetry_source_manager.add_source::<f64>(source2).await; |
| |
| // Query all telemetry sources |
| let all_sources = telemetry_source_manager |
| .query_sources_by_names(&[]) |
| .unwrap(); |
| assert_eq!( |
| all_sources.len(), |
| 2, |
| "There should be 2 telemetry sources in the manager" |
| ); |
| |
| // Query by name |
| let source1 = telemetry_source_manager |
| .query_sources_by_names(&["Source1"]) |
| .unwrap(); |
| assert_eq!(source1.len(), 1); |
| assert_eq!(source1[0], "Source1"); |
| |
| // Query all secondary keys |
| let keys = telemetry_source_manager.query_all_secondary_keys(); |
| assert!(keys.contains(&"FRU".to_string())); |
| assert!(keys.contains(&"ReadingType".to_string())); |
| |
| // Query by secondary key |
| let cpu_sources = telemetry_source_manager |
| .query_sources_by_property(("FRU", "CPU")) |
| .unwrap(); |
| assert_eq!( |
| cpu_sources.len(), |
| 1, |
| "There should be 1 telemetry source with FRU 'CPU'" |
| ); |
| |
| // Verify that polling is not started |
| let source1 = telemetry_source_manager.sources.get("Source1").unwrap(); |
| assert!( |
| !*source1.get_is_polling().read().unwrap(), |
| "Polling should not start before subscription" |
| ); |
| |
| // Subscribe to telemetries from a source |
| let mut handles: Vec<tokio::task::JoinHandle<()>> = Vec::new(); |
| if let Ok(mut subscription) = telemetry_source_manager.subscribe_telemetries::<f64>( |
| "Source1", |
| SubscriptionType::Periodical(Duration::from_millis(10), Duration::from_millis(100)), |
| ) { |
| assert!( |
| *source1.get_is_polling().read().unwrap(), |
| "Polling should start after subscription" |
| ); |
| |
| let handle = tokio::spawn(async move { |
| let mut telemetry_count = 0; |
| while telemetry_count < 5 { |
| if let Some(telemetries) = subscription.telemetry_receiver.recv().await { |
| telemetry_count += 1; |
| for telemetry in telemetries.into_iter() { |
| let typed_telemetry = telemetry |
| .as_any() |
| .downcast_ref::<TypedTelemetry<f64>>() |
| .unwrap(); |
| assert!( |
| typed_telemetry.timestamp <= SystemTime::now(), |
| "Telemetry timestamp is in the future" |
| ); |
| assert!( |
| typed_telemetry.timestamp >= start_time, |
| "Telemetry timestamp is before start time" |
| ); |
| } |
| } |
| } |
| drop(subscription); |
| }); |
| handles.push(handle); |
| } |
| |
| // Wait for all telemetry handlers to complete |
| for handle in handles { |
| handle.await.unwrap(); |
| } |
| |
| assert!( |
| !*source1.get_is_polling().read().unwrap(), |
| "Polling should stop after all subscriptions dropped" |
| ); |
| |
| // Drop this first so you can call remove_source on it |
| drop(source1); |
| |
| // Remove an telemetry source |
| let _ = telemetry_source_manager.remove_source("Source1").await; |
| let remaining_sources = telemetry_source_manager |
| .query_sources_by_names(&[]) |
| .unwrap(); |
| assert_eq!( |
| remaining_sources.len(), |
| 1, |
| "There should be 1 telemetry source remaining" |
| ); |
| assert_eq!(remaining_sources[0], "Source2"); |
| |
| // Test get_value method |
| let value_result = telemetry_source_manager.get_value::<f64>("Source2").await; |
| assert!(value_result.is_ok(), "get_value should succeed"); |
| let (value, timestamp) = value_result.unwrap(); |
| assert!( |
| value >= 0.0 && value <= 100.0, |
| "Value should be between 0 and 100" |
| ); |
| assert!( |
| timestamp <= SystemTime::now(), |
| "Timestamp should not be in the future" |
| ); |
| |
| // Test get_value for non-existent source |
| let error_result = telemetry_source_manager |
| .get_value::<f64>("NonExistentSource") |
| .await; |
| assert!( |
| error_result.is_err(), |
| "get_value should fail for non-existent source" |
| ); |
| |
| println!("Done!"); |
| } |
| |
| struct MockAsyncReadValueF64 { |
| last_read_time: Mutex<SystemTime>, |
| current_value: Mutex<f64>, |
| } |
| |
| impl MockAsyncReadValueF64 { |
| fn new() -> Self { |
| Self { |
| last_read_time: Mutex::new(SystemTime::now()), |
| current_value: Mutex::new(0.0), |
| } |
| } |
| } |
| |
| #[async_trait::async_trait] |
| impl AsyncReadValue<f64> for MockAsyncReadValueF64 { |
| async fn read_values( |
| &self, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Vec<(f64, SystemTime)>, std::io::Error> { |
| time::sleep(reporting_interval).await; |
| |
| let now = SystemTime::now(); |
| let mut values = Vec::new(); |
| |
| let mut last_time = { |
| let mut last_time = self.last_read_time.lock().await; |
| let new_last_time = std::cmp::max(*last_time, now - reporting_interval); |
| *last_time = new_last_time; |
| new_last_time |
| }; |
| |
| while last_time < now { |
| let value = rand::thread_rng().gen::<f64>() * 100.0; |
| values.push((value, last_time)); |
| last_time = last_time.checked_add(sampling_interval).unwrap_or(now); |
| } |
| |
| { |
| let mut last_time_lock = self.last_read_time.lock().await; |
| *last_time_lock = last_time; |
| } |
| |
| Ok(values) |
| } |
| |
| async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> { |
| let now = SystemTime::now(); |
| let value = *self.current_value.lock().await; |
| Ok((value, now)) |
| } |
| } |
| |
| /// Why not make MockAsyncReadValue a generic struct of 'T'? |
| /// because this will make those downcast_ref fail -- in the TelemetrySourceManager, it does not |
| /// have the knowledge of type 'T' for each TelemetrySource. |
| struct MockAsyncReadValueBytes { |
| last_read_time: Mutex<SystemTime>, |
| current_value: Mutex<Arc<[u8]>>, |
| } |
| |
| impl MockAsyncReadValueBytes { |
| fn new() -> Self { |
| Self { |
| last_read_time: Mutex::new(SystemTime::now()), |
| current_value: Mutex::new(Arc::new([0u8; 10])), |
| } |
| } |
| } |
| |
| #[async_trait::async_trait] |
| impl AsyncReadValue<Arc<[u8]>> for MockAsyncReadValueBytes { |
| async fn read_values( |
| &self, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Vec<(Arc<[u8]>, SystemTime)>, std::io::Error> { |
| time::sleep(reporting_interval).await; |
| |
| let now = SystemTime::now(); |
| let mut values = Vec::new(); |
| |
| let mut last_time = { |
| let mut last_time = self.last_read_time.lock().await; |
| let new_last_time = std::cmp::max(*last_time, now - reporting_interval); |
| *last_time = new_last_time; |
| new_last_time |
| }; |
| |
| while last_time < now { |
| let value: Arc<[u8]> = Arc::new(rand::thread_rng().gen::<[u8; 10]>()); |
| values.push((value, last_time)); |
| last_time = last_time.checked_add(sampling_interval).unwrap_or(now); |
| } |
| |
| { |
| let mut last_time_lock = self.last_read_time.lock().await; |
| *last_time_lock = last_time; |
| } |
| |
| Ok(values) |
| } |
| |
| async fn get_value(&self) -> Result<(Arc<[u8]>, SystemTime), std::io::Error> { |
| let now = SystemTime::now(); |
| let value = self.current_value.lock().await.clone(); |
| Ok((value, now)) |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_telemetry_source_manager_with_different_types() { |
| let telemetry_source_manager = TelemetrySourceManager::new(); |
| |
| // Test with f64 |
| let mock_reader_f64 = Arc::new(MockAsyncReadValueF64::new()); |
| let source_f64 = TelemetrySource::new( |
| "SourceF64".to_string(), |
| None, |
| mock_reader_f64.clone(), |
| Duration::from_millis(10), |
| Duration::from_millis(1), |
| Duration::from_millis(50), |
| ); |
| let _ = telemetry_source_manager.add_source::<f64>(source_f64).await; |
| |
| // Test with Arc<[u8]> |
| let mock_reader_bytes = Arc::new(MockAsyncReadValueBytes::new()); |
| let source_bytes = TelemetrySource::new( |
| "SourceBytes".to_string(), |
| None, |
| mock_reader_bytes.clone(), |
| Duration::from_millis(20), |
| Duration::from_millis(2), |
| Duration::from_millis(100), |
| ); |
| let _ = telemetry_source_manager |
| .add_source::<Arc<[u8]>>(source_bytes) |
| .await; |
| |
| // Test get_value for f64 |
| let result_f64 = telemetry_source_manager.get_value::<f64>("SourceF64").await; |
| assert!(result_f64.is_ok(), "get_value should succeed for f64"); |
| let (value_f64, timestamp_f64) = result_f64.unwrap(); |
| assert!( |
| value_f64 >= 0.0 && value_f64 <= 100.0, |
| "f64 value should be between 0 and 100" |
| ); |
| assert!( |
| timestamp_f64 <= SystemTime::now(), |
| "f64 timestamp should not be in the future" |
| ); |
| |
| // Test get_value for Arc<[u8]> |
| let result_bytes = telemetry_source_manager |
| .get_value::<Arc<[u8]>>("SourceBytes") |
| .await; |
| assert!( |
| result_bytes.is_ok(), |
| "get_value should succeed for Arc<[u8]>" |
| ); |
| let (value_bytes, timestamp_bytes) = result_bytes.unwrap(); |
| assert_eq!(value_bytes.len(), 10, "Byte array should have length 10"); |
| assert!( |
| timestamp_bytes <= SystemTime::now(), |
| "Bytes timestamp should not be in the future" |
| ); |
| |
| // Test subscribing to telemetries for both types |
| let mut subscription_f64 = telemetry_source_manager |
| .subscribe_telemetries::<f64>("SourceF64", SubscriptionType::OnChange) |
| .unwrap(); |
| |
| let mut subscription_bytes = telemetry_source_manager |
| .subscribe_telemetries::<Arc<[u8]>>("SourceBytes", SubscriptionType::OnChange) |
| .unwrap(); |
| |
| // Receive and verify telemetries for f64 |
| if let Some(telemetries) = tokio::time::timeout( |
| Duration::from_secs(6), |
| subscription_f64.telemetry_receiver.recv(), |
| ) |
| .await |
| .unwrap() |
| { |
| for telemetry in telemetries { |
| let typed_telemetry = telemetry |
| .as_any() |
| .downcast_ref::<TypedTelemetry<f64>>() |
| .unwrap(); |
| assert!( |
| typed_telemetry.value >= 0.0 && typed_telemetry.value <= 100.0, |
| "f64 telemetry value should be between 0 and 100" |
| ); |
| } |
| } |
| |
| // Receive and verify telemetries for Arc<[u8]> |
| if let Some(telemetries) = tokio::time::timeout( |
| Duration::from_secs(6), |
| subscription_bytes.telemetry_receiver.recv(), |
| ) |
| .await |
| .unwrap() |
| { |
| for telemetry in telemetries { |
| let typed_telemetry = telemetry |
| .as_any() |
| .downcast_ref::<TypedTelemetry<Arc<[u8]>>>() |
| .unwrap(); |
| assert_eq!( |
| typed_telemetry.value.len(), |
| 10, |
| "Byte array in telemetry should have length 10" |
| ); |
| } |
| } |
| |
| // Test get_value for non-existent source |
| let error_result = telemetry_source_manager |
| .get_value::<f64>("NonExistentSource") |
| .await; |
| assert!( |
| error_result.is_err(), |
| "get_value should fail for non-existent source" |
| ); |
| } |
| |
| struct MockAsyncReadSubParam { |
| sampling_interval: Arc<AtomicUsize>, |
| reporting_interval: Arc<AtomicUsize>, |
| } |
| |
| impl MockAsyncReadSubParam { |
| fn new() -> Self { |
| Self { |
| sampling_interval: Arc::new(AtomicUsize::new(0)), |
| reporting_interval: Arc::new(AtomicUsize::new(0)), |
| } |
| } |
| } |
| |
| #[async_trait::async_trait] |
| impl AsyncReadValue<f64> for MockAsyncReadSubParam { |
| async fn read_values( |
| &self, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Vec<(f64, SystemTime)>, std::io::Error> { |
| self.sampling_interval |
| .store(sampling_interval.as_millis() as usize, Ordering::SeqCst); |
| self.reporting_interval |
| .store(reporting_interval.as_millis() as usize, Ordering::SeqCst); |
| |
| let start = tokio::time::Instant::now(); |
| let mut values = Vec::new(); |
| |
| while start.elapsed() < reporting_interval { |
| values.push((0.0, SystemTime::now())); |
| tokio::time::sleep(sampling_interval).await; |
| } |
| |
| Ok(values) |
| } |
| |
| async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> { |
| Ok((0.0, SystemTime::now())) |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_single_subscription_intervals() { |
| let telemetry_source_manager = TelemetrySourceManager::new(); |
| let mock_reader = Arc::new(MockAsyncReadSubParam::new()); |
| |
| let source = TelemetrySource::new( |
| "TestSource".to_string(), |
| None, |
| mock_reader.clone(), |
| Duration::from_secs(1), |
| Duration::from_millis(10), |
| Duration::from_secs(10), |
| ); |
| telemetry_source_manager |
| .add_source::<f64>(source) |
| .await |
| .unwrap(); |
| |
| let sampling_interval = Duration::from_millis(20); |
| let reporting_interval = Duration::from_millis(100); |
| let mut subscription = telemetry_source_manager |
| .subscribe_telemetries::<f64>( |
| "TestSource", |
| SubscriptionType::Periodical(sampling_interval, reporting_interval), |
| ) |
| .unwrap(); |
| |
| // Wait for the first batch of telemetry to be received |
| let timeout_duration = Duration::from_millis(150); |
| match tokio::time::timeout(timeout_duration, subscription.telemetry_receiver.recv()).await { |
| Ok(Some(telemetries)) => { |
| // Check the intervals |
| assert_eq!( |
| mock_reader.sampling_interval.load(Ordering::SeqCst), |
| sampling_interval.as_millis() as usize, |
| "Sampling interval should match the subscription" |
| ); |
| assert_eq!( |
| mock_reader.reporting_interval.load(Ordering::SeqCst), |
| reporting_interval.as_millis() as usize, |
| "Reporting interval should match the subscription" |
| ); |
| |
| // Check batch reporting |
| let expected_count = reporting_interval.as_millis() / sampling_interval.as_millis(); |
| let acceptable_range = (expected_count - 1)..=(expected_count + 1); |
| assert!( |
| acceptable_range.contains(&(telemetries.len() as u128)), |
| "Number of telemetries ({}) should be approximately equal to reporting_interval/sampling_interval ({})", |
| telemetries.len(), |
| expected_count |
| ); |
| } |
| Ok(None) => panic!("Channel closed unexpectedly"), |
| Err(_) => panic!("Timeout waiting for telemetry"), |
| } |
| } |
| |
| #[tokio::test] |
| async fn test_multiple_subscriptions_and_unsubscribe() { |
| let telemetry_source_manager = TelemetrySourceManager::new(); |
| let mock_reader = Arc::new(MockAsyncReadSubParam::new()); |
| |
| let source = TelemetrySource::new( |
| "TestSource".to_string(), |
| None, |
| mock_reader.clone(), |
| Duration::from_secs(1), |
| Duration::from_millis(10), |
| Duration::from_secs(10), |
| ); |
| telemetry_source_manager |
| .add_source::<f64>(source) |
| .await |
| .unwrap(); |
| |
| let sampling_interval1 = Duration::from_millis(20); |
| let reporting_interval1 = Duration::from_millis(80); |
| let mut subscription1 = telemetry_source_manager |
| .subscribe_telemetries::<f64>( |
| "TestSource", |
| SubscriptionType::Periodical(sampling_interval1, reporting_interval1), |
| ) |
| .unwrap(); |
| |
| let sampling_interval2 = Duration::from_millis(30); |
| let reporting_interval2 = Duration::from_millis(60); |
| let mut subscription2 = telemetry_source_manager |
| .subscribe_telemetries::<f64>( |
| "TestSource", |
| SubscriptionType::Periodical(sampling_interval2, reporting_interval2), |
| ) |
| .unwrap(); |
| |
| // Wait for the first telemetry from either subscription |
| let timeout_duration = Duration::from_millis(100); |
| match tokio::time::timeout(timeout_duration, async { |
| tokio::select! { |
| _ = subscription1.telemetry_receiver.recv() => {}, |
| _ = subscription2.telemetry_receiver.recv() => {}, |
| } |
| }) |
| .await |
| { |
| Ok(_) => { |
| assert_eq!( |
| mock_reader.sampling_interval.load(Ordering::SeqCst), |
| sampling_interval1.as_millis() as usize, |
| "Sampling interval should be the minimum of the two subscriptions" |
| ); |
| assert_eq!( |
| mock_reader.reporting_interval.load(Ordering::SeqCst), |
| reporting_interval2.as_millis() as usize, |
| "Reporting interval should be the minimum of the two subscriptions" |
| ); |
| } |
| Err(_) => panic!("Timeout waiting for telemetry"), |
| } |
| // Unsubscribe the first subscription |
| drop(subscription1); |
| |
| // Wait a bit to ensure the unsubscribe takes effect |
| tokio::time::sleep(Duration::from_millis(100)).await; |
| |
| // Wait for the next telemetry from the remaining subscription |
| match tokio::time::timeout(timeout_duration, subscription2.telemetry_receiver.recv()).await |
| { |
| Ok(Some(_)) => { |
| assert_eq!( |
| mock_reader.sampling_interval.load(Ordering::SeqCst), |
| sampling_interval2.as_millis() as usize, |
| "Sampling interval should now match the remaining subscription" |
| ); |
| assert_eq!( |
| mock_reader.reporting_interval.load(Ordering::SeqCst), |
| reporting_interval2.as_millis() as usize, |
| "Reporting interval should now match the remaining subscription" |
| ); |
| } |
| Ok(None) => panic!("Channel closed unexpectedly"), |
| Err(_) => panic!("Timeout waiting for telemetry after unsubscribe"), |
| } |
| |
| // Unsubscribe the second subscription |
| drop(subscription2); |
| |
| // Wait a bit to ensure the unsubscribe takes effect |
| tokio::time::sleep(Duration::from_millis(100)).await; |
| |
| // Trigger a new read by creating a temporary subscription |
| let mut temp_subscription = telemetry_source_manager |
| .subscribe_telemetries::<f64>( |
| "TestSource", |
| SubscriptionType::Periodical(Duration::from_millis(1), Duration::from_millis(10)), |
| ) |
| .unwrap(); |
| |
| match tokio::time::timeout( |
| timeout_duration, |
| temp_subscription.telemetry_receiver.recv(), |
| ) |
| .await |
| { |
| Ok(Some(_)) => { |
| assert_eq!( |
| mock_reader.sampling_interval.load(Ordering::SeqCst), |
| Duration::from_millis(10).as_millis() as usize, |
| "Sampling interval should not lower than minimum_update_interval" |
| ); |
| } |
| Ok(None) => panic!("Channel closed unexpectedly"), |
| Err(_) => panic!("Timeout waiting for telemetry after all unsubscribes"), |
| } |
| } |
| } |