| # Serverless Telemetry Source Manager |
| |
| ## Introduction |
| |
| The Serverless Telemetry Source Manager aims to simplify telemetry management by |
| eliminating the need for separate systemd services, as used in approaches like |
| the OpenBMC [D-Bus sensor daemon](https://github.com/openbmc/dbus-sensors.git). |
| This solution removes the daemon layer and inter-process communication (IPC), |
| resulting in a more straightforward implementation. |
| |
| ## Northbound API |
| |
| The telemetry source manager exposes three main APIs: |
| |
| 1. **`add_source()`**: Adds (or removes) a telemetry source to the manager, |
| which will then poll the source for value updates. |
| 2. **`query_sources()`**: Queries telemetry sources by secondary key (one of the |
| source properties, such as the FRU it belongs to) or by primary key: source |
| name. |
| 3. **`subscribe_telemetries()`**: Allows subscribers to receive telemetry |
| events, by periodical or when value changes. |
| |
| ## Southbound API |
| |
| Clients must provide a method to poll the telemetry value when adding a source |
| to the manager, as defined by the asynchronous trait `AsyncReadValue`. For I2C |
| sources, this can be a simple wrapper to read the I2C device via the sysfs |
| interface. This interface abstraction facilitates integration with third-party |
| vendors. |
| |
| ## Use case example |
| |
| The `src/main.rs` file demonstrates a complete use case from a client's |
| perspective.<br> Typically, one client handles the manager creation, but there |
| is currently no limit on how many clients can query and subscribe |
| simultaneously. |
| |
| 1. Adding telemetry sources<br> Telemetry sources can be added to the manager, |
| and the polling task will run in the background automatically when |
| subscription happens:<br> |
| |
| ```rust |
| // Mock implementation of AsyncReadValue |
| struct MockAsyncReadValue { // Implementation details... } |
| |
| #[async_trait::async_trait] |
| impl AsyncReadValue<f64> for MockAsyncReadValue { |
| async fn read_values( |
| &self, |
| sampling_interval: Duration, |
| reporting_interval: Duration, |
| ) -> Result<Vec<(f64, SystemTime)>, std::io::Error> { |
| // Implementation details... |
| } |
| |
| async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> { |
| // Implementation details... |
| } |
| } |
| |
| // The telemetry source manager can be cloned and shared by other threads/tasks freely |
| let telemetry_source_manager = TelemetrySourceManager::new(); |
| |
| let mock_reader = Arc::new(MockAsyncReadValue::new()); |
| |
| // Add telemetry sources to the manager |
| // A source can have an arbitrary number of properties as key/value pairs |
| // The property key names are not restricted, though they should follow the Redfish data scheme |
| let source1 = TelemetrySource::new( |
| "Source1".to_string(), |
| Some(vec![ |
| ("FRU".to_string(), "CPU".to_string()), |
| ("ReadingType".to_string(), "Temperature".to_string()), |
| ]), |
| mock_reader.clone(), |
| Duration::from_secs(1), |
| Duration::from_millis(100), |
| Duration::from_secs(5), |
| ); |
| telemetry_source_manager.add_source::<f64>(source1).await.unwrap(); |
| |
| // Add more sources as needed... |
| |
| ``` |
| |
| 2. Query telemetry sources<br> |
| |
| 2.1 By primary key: |
| |
| ```rust |
| // Query sources by primary keys: empty input means all sources |
| if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&[]) { |
| println!("All sources in manager:"); |
| for source in sources { |
| println!("{}", source); |
| } |
| } |
| |
| // Query source by primary keys: one source |
| if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&["Source1"]) { |
| assert_eq!(sources.len(), 1); |
| } |
| ``` |
| |
| 2.2 By secondary keys: |
| |
| ```rust |
| // Query all available secondary keys in the manager |
| let keys = telemetry_source_manager.query_all_secondary_keys(); |
| println!("Secondary keys: {:?}", keys); |
| |
| // Query sources by secondary key |
| if let Ok(sources) = telemetry_source_manager.query_sources_by_property(("FRU", "CPU")) { |
| for source in sources { |
| println!("Source with FRU CPU: {}", source); |
| } |
| } |
| ``` |
| |
| 3. Subscribe to telemetry events<br> |
| |
| 3.1 Query sources by reading type, then subscribe to telemetry events from |
| all of them |
| |
| ```rust |
| if let Ok(temperature_sources) = |
| telemetry_source_manager.query_sources_by_property(("ReadingType", "Temperature")) |
| { |
| for source in temperature_sources { |
| println!("Subscribe to temperature source: {}", source); |
| if let Ok(mut subscription) = |
| telemetry_source_manager.subscribe_telemetries::<f64>(&source, SubscriptionType::OnChange) |
| { |
| tokio::spawn(async move { |
| while let Some(telemetries) = subscription.telemetry_receiver.recv().await { |
| for telemetry in telemetries { |
| println!("Received {:?}", telemetry); |
| } |
| } |
| }); |
| } |
| } |
| } |
| ``` |
| |
| 3.2 Subscribe to Periodical telemetry events, which will increase the polling |
| rate of this source |
| |
| ```rust |
| if let Ok(subscription) = telemetry_source_manager.subscribe_telemetries::<f64>( |
| "Source2", |
| SubscriptionType::Periodical(Duration::from_millis(1), Duration::from_millis(10)), |
| ) { |
| let telemetry_source_manager_clone = telemetry_source_manager.clone(); |
| if let Ok(sources) = telemetry_source_manager_clone.query_sources_by_names(&["Source2"]) { |
| println!("Source2 before subscription dropped: {}", sources[0]); |
| } |
| |
| tokio::spawn(async move { |
| // Subscription handling logic... |
| }); |
| } |
| ``` |
| |
| ## Sample output |
| |
| ```sh |
| All sources in manager: |
| Source2 |
| Source3 |
| Source1 |
| Secondary keys: ["ReadingType", "FRU"] |
| Source with FRU CPU: Source1 |
| Source with FRU CPU: Source3 |
| Subscribe to temperature source: Source1 |
| Subscribe to temperature source: Source2 |
| Source2 before subscription dropped: Source2 |
| Received TypedTelemetry { source_name: "Source3", telemetry_type: "OnChange", value: 66.81906494537024, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949746575 } } |
| Received TypedTelemetry { source_name: "Source2", telemetry_type: "Periodical", value: 21.584232735733732, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949765075 } } |
| Received TypedTelemetry { source_name: "Source1", telemetry_type: "Periodical", value: 69.1666621987969, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949819185 } } |
| ... |
| Subscription to Source2 dropped after 15 seconds |
| Source2 after subscription dropped: Source2 |
| ... |
| ``` |
| |
| This README provides an overview of the Serverless Telemetry Source Manager, its |
| APIs, and examples of how to use it. For more detailed information, please refer |
| to the source code and comments within the `telemetry_source_manager.rs` file. |