blob: 2b7ac2ea5357571f91bca3d94993e3a34da4bd0a [file] [log] [blame] [view] [edit]
# Serverless Telemetry Source Manager
## Introduction
The Serverless Telemetry Source Manager aims to simplify telemetry management by
eliminating the need for separate systemd services, as used in approaches like
the OpenBMC [D-Bus sensor daemon](https://github.com/openbmc/dbus-sensors.git).
This solution removes the daemon layer and inter-process communication (IPC),
resulting in a more straightforward implementation.
## Northbound API
The telemetry source manager exposes three main APIs:
1. **`add_source()`**: Adds (or removes) a telemetry source to the manager,
which will then poll the source for value updates.
2. **`query_sources()`**: Queries telemetry sources by secondary key (one of the
source properties, such as the FRU it belongs to) or by primary key: source
name.
3. **`subscribe_telemetries()`**: Allows subscribers to receive telemetry
events, by periodical or when value changes.
## Southbound API
Clients must provide a method to poll the telemetry value when adding a source
to the manager, as defined by the asynchronous trait `AsyncReadValue`. For I2C
sources, this can be a simple wrapper to read the I2C device via the sysfs
interface. This interface abstraction facilitates integration with third-party
vendors.
## Use case example
The `src/main.rs` file demonstrates a complete use case from a client's
perspective.<br> Typically, one client handles the manager creation, but there
is currently no limit on how many clients can query and subscribe
simultaneously.
1. Adding telemetry sources<br> Telemetry sources can be added to the manager,
and the polling task will run in the background automatically when
subscription happens:<br>
```rust
// Mock implementation of AsyncReadValue
struct MockAsyncReadValue { // Implementation details... }
#[async_trait::async_trait]
impl AsyncReadValue<f64> for MockAsyncReadValue {
async fn read_values(
&self,
sampling_interval: Duration,
reporting_interval: Duration,
) -> Result<Vec<(f64, SystemTime)>, std::io::Error> {
// Implementation details...
}
async fn get_value(&self) -> Result<(f64, SystemTime), std::io::Error> {
// Implementation details...
}
}
// The telemetry source manager can be cloned and shared by other threads/tasks freely
let telemetry_source_manager = TelemetrySourceManager::new();
let mock_reader = Arc::new(MockAsyncReadValue::new());
// Add telemetry sources to the manager
// A source can have an arbitrary number of properties as key/value pairs
// The property key names are not restricted, though they should follow the Redfish data scheme
let source1 = TelemetrySource::new(
"Source1".to_string(),
Some(vec![
("FRU".to_string(), "CPU".to_string()),
("ReadingType".to_string(), "Temperature".to_string()),
]),
mock_reader.clone(),
Duration::from_secs(1),
Duration::from_millis(100),
Duration::from_secs(5),
);
telemetry_source_manager.add_source::<f64>(source1).await.unwrap();
// Add more sources as needed...
```
2. Query telemetry sources<br>
2.1 By primary key:
```rust
// Query sources by primary keys: empty input means all sources
if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&[]) {
println!("All sources in manager:");
for source in sources {
println!("{}", source);
}
}
// Query source by primary keys: one source
if let Ok(sources) = telemetry_source_manager.query_sources_by_names(&["Source1"]) {
assert_eq!(sources.len(), 1);
}
```
2.2 By secondary keys:
```rust
// Query all available secondary keys in the manager
let keys = telemetry_source_manager.query_all_secondary_keys();
println!("Secondary keys: {:?}", keys);
// Query sources by secondary key
if let Ok(sources) = telemetry_source_manager.query_sources_by_property(("FRU", "CPU")) {
for source in sources {
println!("Source with FRU CPU: {}", source);
}
}
```
3. Subscribe to telemetry events<br>
3.1 Query sources by reading type, then subscribe to telemetry events from
all of them
```rust
if let Ok(temperature_sources) =
telemetry_source_manager.query_sources_by_property(("ReadingType", "Temperature"))
{
for source in temperature_sources {
println!("Subscribe to temperature source: {}", source);
if let Ok(mut subscription) =
telemetry_source_manager.subscribe_telemetries::<f64>(&source, SubscriptionType::OnChange)
{
tokio::spawn(async move {
while let Some(telemetries) = subscription.telemetry_receiver.recv().await {
for telemetry in telemetries {
println!("Received {:?}", telemetry);
}
}
});
}
}
}
```
3.2 Subscribe to Periodical telemetry events, which will increase the polling
rate of this source
```rust
if let Ok(subscription) = telemetry_source_manager.subscribe_telemetries::<f64>(
"Source2",
SubscriptionType::Periodical(Duration::from_millis(1), Duration::from_millis(10)),
) {
let telemetry_source_manager_clone = telemetry_source_manager.clone();
if let Ok(sources) = telemetry_source_manager_clone.query_sources_by_names(&["Source2"]) {
println!("Source2 before subscription dropped: {}", sources[0]);
}
tokio::spawn(async move {
// Subscription handling logic...
});
}
```
## Sample output
```sh
All sources in manager:
Source2
Source3
Source1
Secondary keys: ["ReadingType", "FRU"]
Source with FRU CPU: Source1
Source with FRU CPU: Source3
Subscribe to temperature source: Source1
Subscribe to temperature source: Source2
Source2 before subscription dropped: Source2
Received TypedTelemetry { source_name: "Source3", telemetry_type: "OnChange", value: 66.81906494537024, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949746575 } }
Received TypedTelemetry { source_name: "Source2", telemetry_type: "Periodical", value: 21.584232735733732, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949765075 } }
Received TypedTelemetry { source_name: "Source1", telemetry_type: "Periodical", value: 69.1666621987969, timestamp: SystemTime { tv_sec: 616376, tv_nsec: 949819185 } }
...
Subscription to Source2 dropped after 15 seconds
Source2 after subscription dropped: Source2
...
```
This README provides an overview of the Serverless Telemetry Source Manager, its
APIs, and examples of how to use it. For more detailed information, please refer
to the source code and comments within the `telemetry_source_manager.rs` file.