tree: 465563146efdd8ff116f87b1262bc0b485d689e9 [path history] [tgz]
  1. dbus_sensors/
  2. i2c_sensors.rs
  3. mod.rs
  4. README.md
  5. sensor_configs.rs
  6. telemetry_source_manager.rs
  7. telemetry_source_manager_api.rs
streaming_telemetry/src/telemetry_source_manager/README.md

Serverless Telemetry Source Manager

Introduction

The Serverless Telemetry Source Manager aims to simplify telemetry management by eliminating the need for separate systemd services, as used in approaches like the OpenBMC D-Bus sensor daemon. This solution removes the daemon layer and inter-process communication (IPC), resulting in a more straightforward implementation.

Northbound API

The telemetry source manager exposes three main APIs:

  1. add_source(): Adds (or removes) a telemetry source to the manager, which will then poll the source for value updates.
  2. query_sources(): Queries telemetry sources by secondary key (one of the source properties, such as the FRU it belongs to) or by primary key: source name.
  3. subscribe_telemetries(): Allows subscribers to receive telemetry events, by periodical or when value changes.

Southbound API

Clients must provide a method to poll the telemetry value when adding a source to the manager, as defined by the asynchronous trait AsyncReadValue. For I2C sources, this can be a simple wrapper to read the I2C device via the sysfs interface. This interface abstraction facilitates integration with third-party vendors.

Use case example

The src/main.rs file demonstrates a complete use case from a client's perspective.
Typically, one client handles the manager creation, but there is currently no limit on how many clients can query and subscribe simultaneously.

  1. Adding telemetry sources
    Telemetry sources can be added to the manager, and the polling task will run in the background automatically when subscription happens:

    // Mock implementation of AsyncReadValue
    struct MockAsyncReadValue { // Implementation details... }
    
    #[async_trait::async_trait]
    impl AsyncReadValue<f64> for MockAsyncReadValue {
        async fn read_values(
            &self,
            sampling_interval: Duration,
            reporting_interval: Duration,
        ) -> Result<Vec<(f64, SystemTime)>, std::io::Error> {
            // Implementation details...
        }
    
        async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> {
            // Implementation details...
        }
    }
    
    // The telemetry source manager can be cloned and shared by other threads/tasks freely
    let telemetry_source_manager = TelemetrySourceManager::new();
    
    let mock_reader = Arc::new(MockAsyncReadValue::new());
    
    // Add telemetry sources to the manager
    // A source can have an arbitrary number of properties as key/value pairs
    // The property key names are not restricted, though they should follow the Redfish data scheme
    let source1 = TelemetrySource::new(
        "Source1".to_string(),
        Some(vec![
            ("FRU".to_string(), "CPU".to_string()),
            ("ReadingType".to_string(), "Temperature".to_string()),
        ]),
        mock_reader.clone(),
        Duration::from_secs(1),
        Duration::from_millis(100),
        Duration::from_secs(5),
    );
    telemetry_source_manager.add_source::<f64>(source1).await.unwrap();
    
    // Add more sources as needed...
    
    
  2. Query telemetry sources

    2.1 By primary key:

    // Query sources by primary keys: empty input means all sources
    if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&[]) {
        println!("All sources in manager:");
        for source in sources {
            println!("{}", source);
        }
    }
    
    // Query source by primary keys: one source
    if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&["Source1"]) {
        assert_eq!(sources.len(), 1);
    }
    

    2.2 By secondary keys:

    // Query all available secondary keys in the manager
    let keys = telemetry_source_manager.query_all_secondary_keys();
    println!("Secondary keys: {:?}", keys);
    
    // Query sources by secondary key
    if let Ok(sources) = telemetry_source_manager.query_sources_by_property(("FRU", "CPU")) {
        for source in sources {
            println!("Source with FRU CPU: {}", source);
        }
    }
    
  3. Subscribe to telemetry events

    3.1 Query sources by reading type, then subscribe to telemetry events from all of them

    if let Ok(temperature_sources) =
        telemetry_source_manager.query_sources_by_property(("ReadingType", "Temperature"))
    {
        for source in temperature_sources {
            println!("Subscribe to temperature source: {}", source);
            if let Ok(mut subscription) =
                telemetry_source_manager.subscribe_telemetries::<f64>(&source, SubscriptionType::OnChange)
            {
                tokio::spawn(async move {
                    while let Some(telemetries) = subscription.telemetry_receiver.recv().await {
                        for telemetry in telemetries {
                            println!("Received {:?}", telemetry);
                        }
                    }
                });
            }
        }
    }
    

    3.2 Subscribe to Periodical telemetry events, which will increase the polling rate of this source

    if let Ok(subscription) = telemetry_source_manager.subscribe_telemetries::<f64>(
        "Source2",
        SubscriptionType::Periodical(Duration::from_millis(1), Duration::from_millis(10)),
    ) {
        let telemetry_source_manager_clone = telemetry_source_manager.clone();
        if let Ok(sources) = telemetry_source_manager_clone.query_sources_by_names(&["Source2"]) {
            println!("Source2 before subscription dropped: {}", sources[0]);
        }
    
        tokio::spawn(async move {
            // Subscription handling logic...
        });
    }
    

Sample output

All sources in manager:
Source2
Source3
Source1
Secondary keys: ["ReadingType", "FRU"]
Source with FRU CPU: Source1
Source with FRU CPU: Source3
Subscribe to temperature source: Source1
Subscribe to temperature source: Source2
Source2 before subscription dropped: Source2
Received TypedTelemetry { source_name: "Source3", telemetry_type: "OnChange", value: 66.81906494537024, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949746575 } }
Received TypedTelemetry { source_name: "Source2", telemetry_type: "Periodical", value: 21.584232735733732, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949765075 } }
Received TypedTelemetry { source_name: "Source1", telemetry_type: "Periodical", value: 69.1666621987969, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949819185 } }
...
Subscription to Source2 dropped after 15 seconds
Source2 after subscription dropped: Source2
...

This README provides an overview of the Serverless Telemetry Source Manager, its APIs, and examples of how to use it. For more detailed information, please refer to the source code and comments within the telemetry_source_manager.rs file.