Surface sensor status in HFT stream Payloads
The current behavior of Metfuego streaming will not return any payload for the whole subscription if an error is encountered while collecting data for the stream. This may occur in situations where a sensor has become stale or when sensors have not yet been initialized e.g. IntelCpuSensors.
We change this behavior to always report all sensors requested in an HFT subscription. Sensors with a CREATION_PENDING status will report no readings, but will have an entry in the payload with the identifier, status, and status message. Sensors with a stale status report any remaining valid readings from their last updated timestamp, along with identifier, status, and status message.
This change also includes addressing an assumption that all sensors have an entry in the `sensor_key_to_task_id` map, this is not true for uninitialized sensors (`STATUS_CREATION_PENDING`), so configuring the collection of such sensors should be bypassed.
#tlbmc
#tlbmc-hft
PiperOrigin-RevId: 820787927
Change-Id: I5b227ae480f84d64730d2f3abc655bc8cd72790b
diff --git a/tlbmc/collector/sensor_collector.cc b/tlbmc/collector/sensor_collector.cc
index d36942d..463dc28 100644
--- a/tlbmc/collector/sensor_collector.cc
+++ b/tlbmc/collector/sensor_collector.cc
@@ -724,10 +724,16 @@
// board config.
if (config.key.empty()) {
for (auto& [key, sensor] : key_to_sensor) {
+ if (sensor->GetSensorAttributesDynamic().state().status() ==
+ STATUS_CREATION_PENDING) {
+ LOG(WARNING) << "Skipping configuration for sensor " << key
+ << " because it is pending creation.";
+ continue;
+ }
absl::Duration interval = get_sampling_interval(sensor, config);
ResizeBufferAndMetrics(sensor, config.max_batch_size);
// It's assumed sensor_key_to_task_id always contains the key if it's in
- // sensor_table_
+ // sensor_table_ and does not have a pending creation status.
{
absl::MutexLock lock(&thread_manager_->key_to_task_id_mutex);
thread_manager_->task_scheduler->UpdateTaskPeriod(
@@ -742,6 +748,12 @@
// given key.
auto it = key_to_sensor.find(config.key);
if (it != key_to_sensor.end()) {
+ if (it->second->GetSensorAttributesDynamic().state().status() ==
+ STATUS_CREATION_PENDING) {
+ LOG(WARNING) << "Skipping configuration for sensor " << config.key
+ << " because it is pending creation.";
+ return absl::OkStatus();
+ }
absl::Duration interval = get_sampling_interval(it->second, config);
{
absl::MutexLock lock(&thread_manager_->key_to_task_id_mutex);
@@ -828,7 +840,7 @@
bus_to_v1_accessor, bus_to_v2_accessor));
ScheduleNicSensorAccessorRefresh(bus_to_v1_accessor, bus_to_v2_accessor,
- sensor_group_to_io_context, *thread_manager);
+ sensor_group_to_io_context, *thread_manager);
for (const auto& [sensor_group, io_context] : sensor_group_to_io_context) {
LOG(INFO) << "Starting io_context thread for sensor group: "
diff --git a/tlbmc/resource.proto b/tlbmc/resource.proto
index deace92..bcee5cf 100644
--- a/tlbmc/resource.proto
+++ b/tlbmc/resource.proto
@@ -23,6 +23,7 @@
}
// State of telemetry data.
+// LINT.IfChange
enum Status {
// Data has not been initialized by the collector.
// Default state on boot.
@@ -40,6 +41,7 @@
// set its creation as pending.
STATUS_CREATION_PENDING = 4;
}
+// LINT.ThenChange(//depot/google3/third_party/milotic/external/cc/hft/types/sensor_payload.proto)
// Type of the resource object.
enum ResourceType {
diff --git a/tlbmc/sensor_payload.proto b/tlbmc/sensor_payload.proto
index 8f58bf3..ecc985b 100644
--- a/tlbmc/sensor_payload.proto
+++ b/tlbmc/sensor_payload.proto
@@ -4,6 +4,29 @@
import "sensor_identifier.proto";
+// State of telemetry data. This should be updated if necessary to match the
+// tlBMC Status enum.
+// LINT.IfChange
+enum Status {
+ // Data has not been initialized by the collector.
+ // Default state on boot.
+ STATUS_UNKNOWN = 0;
+ // Fresh data ready to be served.
+ STATUS_OK = 1;
+ // State when data refresh timed out or an error occurred preventing future
+ // refreshes.
+ STATUS_STALE = 2;
+ // State when sensor creation failed or is pending some precondition to be
+ // created.
+ STATUS_MISSING = 3;
+}
+// LINT.ThenChange(//depot/google3/third_party/milotic/external/cc/tlbmc/store/store_hft_adapter.cc)
+
+message State {
+ Status status = 1;
+ string status_message = 2;
+}
+
message HighFrequencySensorReading {
oneof reading {
float float_reading = 1;
@@ -16,9 +39,12 @@
message HighFrequencySensorsReadings {
repeated HighFrequencySensorReading timestamped_readings = 1;
SensorIdentifier sensor_identifier = 2;
+ State state = 3;
}
message HighFrequencySensorsReadingsBatch {
repeated HighFrequencySensorsReadings high_frequency_sensors = 1;
- int32 configured_sampling_interval_ms = 2; // If set, it means all sensors in high_frequency_sensors has been configured to the same sampling interval
+ int32 configured_sampling_interval_ms =
+ 2; // If set, it means all sensors in high_frequency_sensors has been
+ // configured to the same sampling interval
}
diff --git a/tlbmc/store/store_hft_adapter.cc b/tlbmc/store/store_hft_adapter.cc
index 5dd02a1..e799ee7 100644
--- a/tlbmc/store/store_hft_adapter.cc
+++ b/tlbmc/store/store_hft_adapter.cc
@@ -63,19 +63,26 @@
return absl::NotFoundError(
absl::Substitute("Sensor $0 not found", sensor_identifier.name()));
}
+ const SensorAttributesDynamic dynamic_attributes =
+ sensor->GetSensorAttributesDynamic();
batch->set_configured_sampling_interval_ms(
- sensor->GetSensorAttributesDynamic().refresh_interval().nanos() /
- 1000000);
+ dynamic_attributes.refresh_interval().nanos() / 1000000);
milotic_hft::HighFrequencySensorsReadings* sensors =
batch->add_high_frequency_sensors();
milotic_hft::SensorIdentifier* sensor_identifier_from_readings =
sensors->mutable_sensor_identifier();
+ sensors->mutable_state()->set_status(
+ ConvertToHftStatus(dynamic_attributes.state().status()));
+ if (dynamic_attributes.state().has_status_message()) {
+ sensors->mutable_state()->set_status_message(
+ dynamic_attributes.state().status_message());
+ }
const SensorAttributesStatic& static_attributes =
sensor->GetSensorAttributesStatic();
// Replace the local root identifier with /phys when exporting the devpath.
ECCLESIA_ASSIGN_OR_RETURN(const std::string& devpath,
- store->GetDevpathFromSensor(sensor));
+ store->GetDevpathFromSensor(sensor));
if (!absl::StartsWith(devpath, "/phys")) {
std::vector<std::string> devpath_parts = absl::StrSplit(devpath, '/');
if (devpath_parts.size() < 2) {
@@ -158,6 +165,22 @@
}
}
+milotic_hft::Status ConvertToHftStatus(milotic_tlbmc::Status status) {
+ // Translate the tlBMC status to the corresponding HFT status.
+ // This should be updated to handle new HFT statuses as needed.
+ switch (status) {
+ case milotic_tlbmc::STATUS_READY:
+ return milotic_hft::STATUS_OK;
+ case milotic_tlbmc::STATUS_STALE:
+ return milotic_hft::STATUS_STALE;
+ case milotic_tlbmc::STATUS_CREATION_FAILED:
+ case milotic_tlbmc::STATUS_CREATION_PENDING:
+ return milotic_hft::STATUS_MISSING;
+ default:
+ return milotic_hft::STATUS_UNKNOWN;
+ }
+}
+
absl::StatusOr<std::unique_ptr<StoreHftAdapter>> StoreHftAdapter::Create(
Store* store) {
return absl::WrapUnique(new StoreHftAdapter(store));
diff --git a/tlbmc/store/store_hft_adapter.h b/tlbmc/store/store_hft_adapter.h
index 71b2885..76c37c8 100644
--- a/tlbmc/store/store_hft_adapter.h
+++ b/tlbmc/store/store_hft_adapter.h
@@ -12,6 +12,7 @@
#include "identifier.pb.h"
#include "payload.pb.h"
#include "sensor_identifier.pb.h"
+#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
#include "sensor.pb.h"
#include "tlbmc/store/store.h"
@@ -24,6 +25,9 @@
// Simple helper function to convert a SensorUnit to a HFT SensorUnits.
milotic_hft::SensorUnits ConvertToHftUnit(SensorUnit unit);
+// Simple helper function to convert a Status to a HFT Status.
+milotic_hft::Status ConvertToHftStatus(Status status);
+
// An adapter to the Store interface for the High Frequency Telemetry (HFT)
// framework.
class StoreHftAdapter : public milotic_hft::DataSource {
diff --git a/tlbmc/subscription/manager_impl.cc b/tlbmc/subscription/manager_impl.cc
index a4f5ba3..c04462b 100644
--- a/tlbmc/subscription/manager_impl.cc
+++ b/tlbmc/subscription/manager_impl.cc
@@ -34,6 +34,7 @@
#include "payload.pb.h"
#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
+#include "resource.pb.h"
#include "tlbmc/scheduler/scheduler.h"
namespace milotic_hft {
@@ -451,8 +452,24 @@
if (!data.ok() || data->ByteSizeLong() == 0) {
LOG(ERROR) << "Failed to collect data for subscription " << self->id_
<< ": " << data.status();
- on_done();
- return;
+ // In this case, there is no valid payload created. Create a placeholder
+ // payload with STATUS_STALE and status message to be able to deliver.
+ Payload empty_payload;
+ HighFrequencySensorsReadings* sensor_readings =
+ empty_payload.mutable_high_frequency_sensors_readings_batch()
+ ->mutable_high_frequency_sensors()
+ ->Add();
+
+ *sensor_readings->mutable_sensor_identifier() =
+ identifier.sensor_identifier();
+ milotic_hft::State state;
+ state.set_status(milotic_hft::STATUS_MISSING);
+ state.set_status_message(
+ absl::StrCat("Failed to collect data for identifier ", identifier,
+ " with status: ", data.status()));
+ *sensor_readings->mutable_state() = state;
+ consolidated_data.MergeFrom(empty_payload);
+ continue;
}
// Resample the data to the desired sampling interval.
@@ -466,8 +483,8 @@
if (!resampled_data.ok()) {
LOG(ERROR) << "Failed to resample data for subscription " << self->id_
<< ": " << resampled_data.status();
- on_done();
- return;
+ consolidated_data.MergeFrom(*data);
+ continue;
}
}
@@ -477,8 +494,8 @@
if (!updated_sample_time.ok()) {
LOG(ERROR) << "Failed to get last sampled time for subscription "
<< self->id_ << ": " << updated_sample_time.status();
- on_done();
- return;
+ consolidated_data.MergeFrom(*resampled_data);
+ continue;
}
last_sampled_time = *updated_sample_time;