The Serverless Telemetry Source Manager aims to simplify telemetry management by eliminating the need for separate systemd services, as used in approaches like the OpenBMC D-Bus sensor daemon. This solution removes the daemon layer and inter-process communication (IPC), resulting in a more straightforward implementation.
The telemetry source manager exposes three main APIs:
add_source(): Adds (or removes) a telemetry source to the manager, which will then poll the source for value updates.query_sources(): Queries telemetry sources by secondary key (one of the source properties, such as the FRU it belongs to) or by primary key: source name.subscribe_telemetries(): Allows subscribers to receive telemetry events, by periodical or when value changes.Clients must provide a method to poll the telemetry value when adding a source to the manager, as defined by the asynchronous trait AsyncReadValue. For I2C sources, this can be a simple wrapper to read the I2C device via the sysfs interface. This interface abstraction facilitates integration with third-party vendors.
The src/main.rs file demonstrates a complete use case from a client's perspective.
Typically, one client handles the manager creation, but there is currently no limit on how many clients can query and subscribe simultaneously.
Adding telemetry sources
Telemetry sources can be added to the manager, and the polling task will run in the background automatically when subscription happens:
// Mock implementation of AsyncReadValue struct MockAsyncReadValue { // Implementation details... } #[async_trait::async_trait] impl AsyncReadValue<f64> for MockAsyncReadValue { async fn read_values( &self, sampling_interval: Duration, reporting_interval: Duration, ) -> Result<Vec<(f64, SystemTime)>, std::io::Error> { // Implementation details... } async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> { // Implementation details... } } // The telemetry source manager can be cloned and shared by other threads/tasks freely let telemetry_source_manager = TelemetrySourceManager::new(); let mock_reader = Arc::new(MockAsyncReadValue::new()); // Add telemetry sources to the manager // A source can have an arbitrary number of properties as key/value pairs // The property key names are not restricted, though they should follow the Redfish data scheme let source1 = TelemetrySource::new( "Source1".to_string(), Some(vec![ ("FRU".to_string(), "CPU".to_string()), ("ReadingType".to_string(), "Temperature".to_string()), ]), mock_reader.clone(), Duration::from_secs(1), Duration::from_millis(100), Duration::from_secs(5), ); telemetry_source_manager.add_source::<f64>(source1).await.unwrap(); // Add more sources as needed...
Query telemetry sources
2.1 By primary key:
// Query sources by primary keys: empty input means all sources if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&[]) { println!("All sources in manager:"); for source in sources { println!("{}", source); } } // Query source by primary keys: one source if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&["Source1"]) { assert_eq!(sources.len(), 1); }
2.2 By secondary keys:
// Query all available secondary keys in the manager let keys = telemetry_source_manager.query_all_secondary_keys(); println!("Secondary keys: {:?}", keys); // Query sources by secondary key if let Ok(sources) = telemetry_source_manager.query_sources_by_property(("FRU", "CPU")) { for source in sources { println!("Source with FRU CPU: {}", source); } }
Subscribe to telemetry events
3.1 Query sources by reading type, then subscribe to telemetry events from all of them
if let Ok(temperature_sources) = telemetry_source_manager.query_sources_by_property(("ReadingType", "Temperature")) { for source in temperature_sources { println!("Subscribe to temperature source: {}", source); if let Ok(mut subscription) = telemetry_source_manager.subscribe_telemetries::<f64>(&source, SubscriptionType::OnChange) { tokio::spawn(async move { while let Some(telemetries) = subscription.telemetry_receiver.recv().await { for telemetry in telemetries { println!("Received {:?}", telemetry); } } }); } } }
3.2 Subscribe to Periodical telemetry events, which will increase the polling rate of this source
if let Ok(subscription) = telemetry_source_manager.subscribe_telemetries::<f64>( "Source2", SubscriptionType::Periodical(Duration::from_millis(1), Duration::from_millis(10)), ) { let telemetry_source_manager_clone = telemetry_source_manager.clone(); if let Ok(sources) = telemetry_source_manager_clone.query_sources_by_names(&["Source2"]) { println!("Source2 before subscription dropped: {}", sources[0]); } tokio::spawn(async move { // Subscription handling logic... }); }
All sources in manager: Source2 Source3 Source1 Secondary keys: ["ReadingType", "FRU"] Source with FRU CPU: Source1 Source with FRU CPU: Source3 Subscribe to temperature source: Source1 Subscribe to temperature source: Source2 Source2 before subscription dropped: Source2 Received TypedTelemetry { source_name: "Source3", telemetry_type: "OnChange", value: 66.81906494537024, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949746575 } } Received TypedTelemetry { source_name: "Source2", telemetry_type: "Periodical", value: 21.584232735733732, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949765075 } } Received TypedTelemetry { source_name: "Source1", telemetry_type: "Periodical", value: 69.1666621987969, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949819185 } } ... Subscription to Source2 dropped after 15 seconds Source2 after subscription dropped: Source2 ...
This README provides an overview of the Serverless Telemetry Source Manager, its APIs, and examples of how to use it. For more detailed information, please refer to the source code and comments within the telemetry_source_manager.rs file.