blob: 840ba8d2aa6a8f8834137315ef335f6e4a6ea4b3 [file] [log] [blame]
#include "tlbmc/collector/sensor_collector.h"
#include <algorithm>
#include <array>
#include <climits>
#include <cstddef>
#include <cstring>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "boost/asio.hpp" //NOLINT: boost::asio is commonly used in BMC
#include "g3/macros.h"
#include "thread/thread.h"
#include "nlohmann/json.hpp"
#include "tlbmc/collector/peci_scanner.h"
#include "fan_controller_config.pb.h"
#include "fan_pwm_config.pb.h"
#include "fan_tach_config.pb.h"
#include "hwmon_temp_sensor_config.pb.h"
#include "intel_cpu_sensor_config.pb.h"
#include "psu_sensor_config.pb.h"
#include "shared_mem_sensor_config.pb.h"
#include "virtual_sensor_config.pb.h"
#include "tlbmc/expression/expression.h"
#include "tlbmc/hal/nic_veeprom/interface.h"
#include "tlbmc/hal/sysfs/hwmon.h"
#include "resource.pb.h"
#include "sensor.pb.h"
#include "tlbmc/sensors/fan_controller.h"
#include "tlbmc/sensors/fan_pwm.h"
#include "tlbmc/sensors/fan_tach.h"
#include "tlbmc/sensors/hwmon_temp_sensor.h"
#include "tlbmc/sensors/intel_cpu_sensor.h"
#include "tlbmc/sensors/nic_sensor.h"
#include "tlbmc/sensors/psu_sensor.h"
#include "tlbmc/sensors/sensor.h"
#include "tlbmc/sensors/shared_mem_based_sensor.h"
#include "tlbmc/sensors/virtual_sensor.h"
#include "tlbmc/time/time.h"
#include "google/protobuf/json/json.h"
#include "google/protobuf/util/json_util.h"
constexpr absl::Duration kDefaultSensorSamplingInterval =
absl::Milliseconds(1000);
namespace milotic_tlbmc {
namespace {
constexpr absl::string_view kDefaultHwmonContext = "DEFAULT_HWMON_CONTEXT";
constexpr absl::string_view kDefaultPsuContext = "DEFAULT_PSU_CONTEXT";
constexpr absl::string_view kDefaultFanContext = "DEFAULT_FAN_CONTEXT";
constexpr absl::string_view kDefaultVirtualSensorContext =
"DEFAULT_VIRTUAL_SENSOR_CONTEXT";
constexpr absl::string_view kDefaultIntelCpuContext =
"DEFAULT_INTEL_CPU_CONTEXT";
constexpr absl::string_view kDefaultNicTelemetryContext =
"DEFAULT_NIC_TELEMETRY_CONTEXT";
constexpr absl::string_view kDefaultNicTelemetryRefreshContext =
"DEFAULT_NIC_TELEMETRY_REFRESH_CONTEXT";
void ResizeBufferAndMetrics(const std::shared_ptr<Sensor>& sensor,
size_t buffer_size_from_config) {
if (buffer_size_from_config == INT_MIN) {
return;
}
size_t default_buffer_size =
sensor->GetSensorAttributesStatic().entity_common_config().queue_size();
if (buffer_size_from_config > default_buffer_size) {
sensor->ResizeBuffer(buffer_size_from_config);
} else {
// If the buffer size from the config is less than the default buffer size,
// we need to resize the buffer to the default buffer size.
// This covers both the cases where we need to reset the buffer size to the
// default value and when a lower than default size is specified in the
// config.
sensor->ResizeBuffer(default_buffer_size);
}
sensor->ResetMetrics();
}
void ScheduleIndividualSensorRead(
const std::shared_ptr<Sensor>& sensor,
const SensorNotification* refresh_notification,
std::optional<int> override_sensor_sampling_interval_ms,
ThreadManager& thread_manager) {
absl::Duration interval = kDefaultSensorSamplingInterval;
if (override_sensor_sampling_interval_ms.has_value()) {
LOG(INFO) << "Overriding sensor sampling interval to "
<< *override_sensor_sampling_interval_ms;
interval = absl::Milliseconds(*override_sensor_sampling_interval_ms);
} else {
absl::Duration static_refresh_interval = DecodeGoogleApiProto(
sensor->GetSensorAttributesStatic().static_refresh_interval());
if (static_refresh_interval > absl::ZeroDuration()) {
interval = static_refresh_interval;
}
}
Status status = sensor->GetSensorAttributesDynamic().state().status();
if (status == STATUS_CREATION_FAILED || status == STATUS_CREATION_PENDING) {
LOG(INFO) << "Skipping schedule sensor read for " << sensor->GetKey()
<< " because sensor initialization failed. Status: " << status;
return;
}
LOG(INFO) << "Scheduling Sensor Read! Sensor key is " << sensor->GetKey()
<< " and interval is " << interval;
int task_id = thread_manager.task_scheduler->RunAndScheduleAsync(
[sensor = std::weak_ptr<Sensor>(sensor),
refresh_notification =
refresh_notification](absl::AnyInvocable<void()> on_done) {
std::shared_ptr<Sensor> sensor_locked = sensor.lock();
if (!sensor_locked) {
return;
}
sensor_locked->RefreshOnceAsync(
[refresh_notification = refresh_notification,
on_done = std::move(on_done)](
const std::shared_ptr<const SensorValue>& sensor_data) mutable {
if (refresh_notification != nullptr) {
refresh_notification->NotifyWithData(sensor_data);
}
on_done();
});
},
interval);
{
absl::MutexLock lock(&thread_manager.key_to_task_id_mutex);
thread_manager.sensor_key_to_task_id[sensor->GetKey()] = task_id;
}
}
void ScheduleAllSensorReads(
const std::vector<std::shared_ptr<Sensor>>& sensors,
const SensorNotification* refresh_notification,
std::optional<int> override_sensor_sampling_interval_ms,
ThreadManager& thread_manager) {
LOG(INFO) << "Scheduling Sensor Read! Total sensor count is "
<< sensors.size();
absl::flat_hash_map<absl::Duration, std::vector<std::weak_ptr<Sensor>>>
sensors_by_interval;
for (const auto& sensor : sensors) {
ScheduleIndividualSensorRead(sensor, refresh_notification,
override_sensor_sampling_interval_ms,
thread_manager);
}
}
// Returns the io_context for the given sensor group. If the io_context does not
// exist, it will be created. SensorGroup should be a unique identifier for
// sensors that share the same i2c device or are otherwise related to optimize
// reading speed for HFT and minimize contention over the bus mutex. See
// b/434024387 for details.
std::shared_ptr<boost::asio::io_context> GetOrCreateIoContextForSensorGroup(
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context,
absl::string_view sensor_group) {
auto& group_io_context = sensor_group_to_io_context[sensor_group];
if (group_io_context == nullptr) {
group_io_context = std::make_shared<boost::asio::io_context>();
}
return group_io_context;
}
absl::Status CreateHwmonSensors(
const SensorCollector::Params& params,
std::vector<std::shared_ptr<Sensor>>& sensors,
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context) {
std::vector<std::shared_ptr<Sensor>> hwmon_temp_sensors;
size_t count_sensors = 0;
for (const auto& config : params.sensor_configs.hwmon_temp_sensor_configs) {
absl::string_view sensor_group = config.has_sensor_group()
? config.sensor_group()
: kDefaultHwmonContext;
ECCLESIA_ASSIGN_OR_RETURN(
hwmon_temp_sensors,
HwmonTempSensor::Create(config,
GetOrCreateIoContextForSensorGroup(
sensor_group_to_io_context, sensor_group),
params.i2c_sysfs));
// We want to use syslog to track device creation status
LOG(INFO) << absl::Substitute("Created $0 HWmon sensors at $1",
hwmon_temp_sensors.size(),
config.hal_common_config());
count_sensors += hwmon_temp_sensors.size();
sensors.insert(sensors.end(), hwmon_temp_sensors.begin(),
hwmon_temp_sensors.end());
}
return absl::OkStatus();
}
absl::Status CreatePsuSensors(
const SensorCollector::Params& params,
std::vector<std::shared_ptr<Sensor>>& sensors,
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context) {
size_t count_sensors = 0;
for (const auto& config : params.sensor_configs.psu_sensor_configs) {
absl::string_view sensor_group =
config.has_sensor_group() ? config.sensor_group() : kDefaultPsuContext;
// Default to use i2c
HwmonSysfs* hwmon_sysfs = &params.i2c_sysfs;
if (PsuSensor::IsI3cPsuSensor(config.type())) {
hwmon_sysfs = &params.i3c_sysfs;
}
ECCLESIA_ASSIGN_OR_RETURN(
std::vector<std::shared_ptr<Sensor>> psu_sensors,
PsuSensor::Create(config,
GetOrCreateIoContextForSensorGroup(
sensor_group_to_io_context, sensor_group),
*hwmon_sysfs));
// We want to use syslog to track device creation status
LOG(INFO) << absl::Substitute("Created $0 PSU sensors at $1",
psu_sensors.size(),
config.hal_common_config());
count_sensors += psu_sensors.size();
sensors.insert(sensors.end(), psu_sensors.begin(), psu_sensors.end());
}
return absl::OkStatus();
}
absl::Status CreateFanSensors(
const SensorCollector::Params& params,
absl::Span<const std::shared_ptr<FanController>> fan_controllers,
std::vector<std::shared_ptr<Sensor>>& sensors,
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context) {
int count = 0;
for (const auto& config : params.sensor_configs.fan_pwm_configs) {
for (const auto& fan_controller : fan_controllers) {
if (!fan_controller->ControllerHasSensor(config.hal_common_config())) {
continue;
}
absl::string_view sensor_group = config.has_sensor_group()
? config.sensor_group()
: kDefaultFanContext;
ECCLESIA_ASSIGN_OR_RETURN(
std::shared_ptr<Sensor> fan_pwm,
FanPwm::Create(config, *fan_controller,
GetOrCreateIoContextForSensorGroup(
sensor_group_to_io_context, sensor_group),
params.i2c_sysfs));
sensors.push_back(std::move(fan_pwm));
count++;
break;
}
}
for (const auto& config : params.sensor_configs.fan_tach_configs) {
for (const auto& fan_controller : fan_controllers) {
if (!fan_controller->ControllerHasSensor(config.hal_common_config())) {
continue;
}
absl::string_view sensor_group = config.has_sensor_group()
? config.sensor_group()
: kDefaultFanContext;
ECCLESIA_ASSIGN_OR_RETURN(
std::shared_ptr<Sensor> fan_tach,
FanTachometer::Create(config, *fan_controller,
GetOrCreateIoContextForSensorGroup(
sensor_group_to_io_context, sensor_group),
params.i2c_sysfs));
sensors.push_back(std::move(fan_tach));
count++;
break;
}
}
// We want to use syslog to track device creation status
LOG(INFO) << absl::Substitute("Created $0 Fan PWM/Tach sensors", count);
return absl::OkStatus();
}
absl::Status CreateSharedMemSensors(
const SensorCollector::Params& params,
std::vector<std::shared_ptr<Sensor>>& sensors,
ThreadManager& thread_manager) {
auto io_context = std::make_shared<boost::asio::io_context>();
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
work_guard(boost::asio::make_work_guard(*io_context));
int count = 0;
for (const auto& config : params.sensor_configs.shared_mem_sensor_configs) {
ECCLESIA_ASSIGN_OR_RETURN(std::shared_ptr<Sensor> shared_mem_sensor,
SharedMemBasedSensor::Create(config, io_context));
// We want to use syslog to track device creation status
LOG(INFO) << absl::Substitute("Created SharedMem sensor: $0",
config.name());
sensors.push_back(std::move(shared_mem_sensor));
count++;
}
if (count > 0) {
thread_manager.work_guards.push_back(std::move(work_guard));
thread_manager.io_contexts.push_back(std::move(io_context));
}
return absl::OkStatus();
}
absl::Status CreateVirtualSensors(
const SensorCollector::Params& params,
std::vector<std::shared_ptr<Sensor>>& sensors,
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context) {
absl::flat_hash_map<std::string, std::shared_ptr<Sensor>> sensor_map;
for (const std::shared_ptr<Sensor>& sensor : sensors) {
sensor_map[sensor->GetKey()] = sensor;
}
// All virtual sensors are created in the same io_context.
std::shared_ptr<boost::asio::io_context> io_context =
GetOrCreateIoContextForSensorGroup(sensor_group_to_io_context,
kDefaultVirtualSensorContext);
for (const auto& config : params.sensor_configs.virtual_sensor_configs) {
absl::flat_hash_map<std::string, std::shared_ptr<Sensor>>
reference_sensors_map;
ECCLESIA_ASSIGN_OR_RETURN(std::unique_ptr<Expression> expression,
Parse(config.expression()));
absl::flat_hash_set<std::string> required_sensors;
expression->GetRequiredVariables(required_sensors);
for (const auto& sensor_key : required_sensors) {
auto it = sensor_map.find(sensor_key);
if (it == sensor_map.end()) {
return absl::InvalidArgumentError(
absl::StrCat("Reference sensor not found: ", sensor_key));
}
reference_sensors_map[sensor_key] = it->second;
}
ECCLESIA_ASSIGN_OR_RETURN(
std::shared_ptr<Sensor> virtual_sensor,
VirtualSensor::Create(config, io_context, reference_sensors_map,
std::move(expression)));
LOG(INFO) << absl::StrCat("Created Virtual sensor: ", config.name());
sensors.push_back(std::move(virtual_sensor));
}
return absl::OkStatus();
}
// Creates NIC sensors and adds them to the sensors vector.
// Also creates the accessors for the NICs per bus.
absl::Status CreateNicSensors(
const SensorCollector::Params& params,
std::vector<std::shared_ptr<Sensor>>& sensors,
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context,
absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>&
bus_to_v1_accessor,
absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>&
bus_to_v2_accessor) {
std::shared_ptr<boost::asio::io_context> io_context =
GetOrCreateIoContextForSensorGroup(sensor_group_to_io_context,
kDefaultNicTelemetryContext);
for (const auto& config : params.sensor_configs.nic_telemetry_configs) {
int bus = static_cast<int>(config.hal_common_config().bus());
// Existing assumption on
// https://gbmc-internal.git.corp.google.com/diorite-hss/+/8c81357e247e55df1ee2e3799f32550d43124c46/find_diorite.cpp#223
constexpr int kNicVirtualEepromAddress = 75;
nic_veeprom::Accessor* accessor = nullptr;
if (config.version() == nic_veeprom::NIC_TELEMETRY_VERSION_V1) {
auto& v1_accessor = bus_to_v1_accessor[bus];
if (v1_accessor == nullptr) {
v1_accessor = params.nic_accessor_factory(config.version(), bus,
kNicVirtualEepromAddress);
}
accessor = v1_accessor.get();
}
if (config.version() == nic_veeprom::NIC_TELEMETRY_VERSION_V2) {
auto& v2_accessor = bus_to_v2_accessor[bus];
if (v2_accessor == nullptr) {
v2_accessor = params.nic_accessor_factory(config.version(), bus,
kNicVirtualEepromAddress);
}
accessor = v2_accessor.get();
}
if (accessor == nullptr) {
return absl::InternalError(
"Failed to create accessor for NIC telemetry config: no accessor");
}
for (const auto& telemetry_name_int : config.telemetry_names()) {
auto telemetry_name =
static_cast<nic_veeprom::NicTelemetryName>(telemetry_name_int);
std::string sensor_name =
nic_veeprom::NicTelemetryName_Name(telemetry_name);
constexpr absl::string_view kNicTelemetryNamePrefix =
"NIC_TELEMETRY_NAME_";
if (absl::StartsWith(sensor_name, kNicTelemetryNamePrefix)) {
sensor_name = sensor_name.substr(kNicTelemetryNamePrefix.size());
}
sensor_name = absl::StrCat(
config.entity_common_config().board_config_key(), "_", sensor_name);
// For now, unit won't be set. Sensor unit can be derived from the sensor
// name.
ECCLESIA_ASSIGN_OR_RETURN(
std::shared_ptr<Sensor> nic_sensor,
NicSensor::Create(sensor_name, telemetry_name,
SensorUnit::UNIT_UNKNOWN,
config.entity_common_config(), ThresholdConfigs(),
ReadingRangeConfigs(), io_context, *accessor));
sensors.push_back(std::move(nic_sensor));
}
}
return absl::OkStatus();
}
absl::StatusOr<std::vector<std::shared_ptr<FanController>>>
CreateFanControllers(const SensorCollector::Params& params) {
std::vector<std::shared_ptr<FanController>> fan_controllers;
for (const auto& config : params.sensor_configs.fan_controller_configs) {
ECCLESIA_ASSIGN_OR_RETURN(std::shared_ptr<FanController> fan_controller,
FanController::Create(config, params.i2c_sysfs));
// We want to use syslog to track device creation status
LOG(INFO) << absl::Substitute("Created 1 fan controller at $0",
config.hal_common_config());
fan_controllers.push_back(std::move(fan_controller));
}
return fan_controllers;
}
absl::Status CreateIntelCpuSensors(
const SensorCollector::Params& params,
std::vector<std::shared_ptr<Sensor>>& sensors,
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context) {
for (const auto& config : params.sensor_configs.intel_cpu_sensor_configs) {
absl::string_view sensor_group = config.has_sensor_group()
? config.sensor_group()
: kDefaultIntelCpuContext;
ECCLESIA_ASSIGN_OR_RETURN(
std::vector<std::shared_ptr<Sensor>> intel_cpu_sensors,
IntelCpuSensor::CreateInitialSensors(
config,
GetOrCreateIoContextForSensorGroup(sensor_group_to_io_context,
sensor_group),
params.peci_sysfs));
// We want to use syslog to track device creation status
LOG(INFO) << absl::Substitute("Created $0 Intel CPU sensors at $1",
intel_cpu_sensors.size(),
config.hal_common_config());
sensors.insert(sensors.end(), intel_cpu_sensors.begin(),
intel_cpu_sensors.end());
}
return absl::OkStatus();
}
absl::flat_hash_map<std::string,
absl::flat_hash_map<std::string, std::shared_ptr<Sensor>>>
CreateSensorsTable(std::vector<std::shared_ptr<Sensor>>&& sensors) {
absl::flat_hash_map<std::string,
absl::flat_hash_map<std::string, std::shared_ptr<Sensor>>>
sensor_table;
for (std::shared_ptr<Sensor>& sensor : sensors) {
sensor_table[sensor->GetBoardConfigKey()][sensor->GetKey()] =
std::move(sensor);
}
return sensor_table;
}
// NIC sensors won't be refreshed at the sensor level. Instead, we will have a
// dedicated task to refresh all NIC sensors together per accessor.
void ScheduleNicSensorAccessorRefresh(
const absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>&
bus_to_v1_accessor,
const absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>&
bus_to_v2_accessor,
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>&
sensor_group_to_io_context,
ThreadManager& thread_manager) {
if (!bus_to_v1_accessor.empty() || !bus_to_v2_accessor.empty()) {
std::vector<nic_veeprom::Accessor*> accessors;
auto& io_context =
sensor_group_to_io_context[kDefaultNicTelemetryRefreshContext];
io_context = std::make_shared<boost::asio::io_context>();
for (const auto& [bus, accessor] : bus_to_v1_accessor) {
accessors.push_back(accessor.get());
}
for (const auto& [bus, accessor] : bus_to_v2_accessor) {
accessors.push_back(accessor.get());
}
// Now have a dedicated task to refresh the NIC sensors.
thread_manager.task_scheduler->RunAndScheduleAsync(
[accessors = std::move(accessors),
io_context = io_context](absl::AnyInvocable<void()> on_done) mutable {
for (auto* accessor : accessors) {
boost::asio::post(*io_context,
[accessor]() { accessor->DoRefresh(); });
}
on_done();
},
absl::Seconds(1), absl::Seconds(10));
}
}
} // namespace
nlohmann::json SensorCollector::ToJson() const {
nlohmann::json::object_t response;
for (const auto& [board_config_key, key_to_sensor] : sensor_table_) {
nlohmann::json::object_t sensors;
for (const auto& [key, sensor] : key_to_sensor) {
std::string json_string;
::google::protobuf::util::JsonPrintOptions opts;
opts.preserve_proto_field_names = true;
std::shared_ptr<const SensorValue> sensor_data = sensor->GetSensorData();
if (sensor_data == nullptr ||
!::google::protobuf::json::MessageToJsonString(*sensor_data, &json_string, opts)
.ok()) {
LOG(ERROR) << "Failed to get sensor data for " << key;
continue;
}
nlohmann::json& sensor_json = sensors[key];
sensor_json["Value"] = nlohmann::json::parse(json_string, nullptr, false);
json_string.clear();
if (!::google::protobuf::json::MessageToJsonString(sensor->GetSensorMetrics(),
&json_string, opts)
.ok()) {
LOG(ERROR) << "Failed to get sensor metrics for " << key;
continue;
}
sensor_json["Metrics"] =
nlohmann::json::parse(json_string, nullptr, false);
json_string.clear();
if (!::google::protobuf::json::MessageToJsonString(
sensor->GetSensorAttributesStatic(), &json_string, opts)
.ok()) {
LOG(ERROR) << "Failed to convert SensorStaticAttributes to JSON";
continue;
}
sensor_json["StaticAttributes"] =
nlohmann::json::parse(json_string, nullptr, false);
json_string.clear();
if (!::google::protobuf::json::MessageToJsonString(
sensor->GetSensorAttributesDynamic(), &json_string, opts)
.ok()) {
LOG(ERROR) << "Failed to convert SensorDynamicAttributes to JSON";
continue;
}
sensor_json["DynamicAttributes"] =
nlohmann::json::parse(json_string, nullptr, false);
}
response[board_config_key] = sensors;
}
return response;
}
SensorCollector::SensorCollector(
std::vector<std::shared_ptr<Sensor>>&& sensors,
absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>&&
bus_to_v1_accessor,
absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>&&
bus_to_v2_accessor,
std::unique_ptr<ThreadManager> thread_manager,
const SensorNotification* refresh_notification,
std::optional<int> override_sensor_sampling_interval_ms,
ecclesia::ThreadFactoryInterface* thread_factory,
std::unique_ptr<PeciScanner> peci_scanner)
: bus_to_v1_accessor_(std::move(bus_to_v1_accessor)),
bus_to_v2_accessor_(std::move(bus_to_v2_accessor)),
sensor_table_(CreateSensorsTable(std::move(sensors))),
thread_manager_(std::move(thread_manager)),
peci_scanner_(std::move(peci_scanner)),
refresh_notification_(refresh_notification),
override_sensor_sampling_interval_ms_(
override_sensor_sampling_interval_ms),
thread_factory_(thread_factory) {}
SensorCollector::~SensorCollector() {
// In the case of EmptySensorCollector, avoid dereferencing nullptr.
if (thread_manager_ == nullptr) {
return;
}
// Stop the scheduler first.
thread_manager_->task_scheduler->Stop();
// Stop io_contexts.
for (const std::shared_ptr<boost::asio::io_context>& io_context :
thread_manager_->io_contexts) {
io_context->stop();
}
// Finally join all threads.
for (const std::unique_ptr<ecclesia::ThreadInterface>& thread :
thread_manager_->threads) {
thread->Join();
}
peci_scanner_.reset();
thread_manager_.reset();
}
std::vector<std::string> SensorCollector::GetAllSensorKeysByConfigKey(
const std::string& board_config_key) const {
std::vector<std::string> sensor_keys;
if (auto it = sensor_table_.find(board_config_key);
it != sensor_table_.end()) {
for (const auto& [key, _] : it->second) {
sensor_keys.push_back(key);
}
}
std::sort(sensor_keys.begin(), sensor_keys.end());
return sensor_keys;
}
void SensorCollector::ReinitializeAndScheduleAllSensorsForConfigKey(
const std::string& board_config_key) {
auto it = sensor_table_.find(board_config_key);
if (it == sensor_table_.end()) {
LOG(WARNING) << "No sensors found for config key: " << board_config_key
<< ". Skipping reinitialization.";
return;
}
for (const auto& [key, sensor] : it->second) {
sensor->Reinitialize([this, sensor, key](absl::Status status) {
if (!status.ok()) {
LOG(WARNING) << "Failed to reinitialize sensor: " << key
<< " with status: " << status;
return;
}
ScheduleIndividualSensorRead(sensor, refresh_notification_,
override_sensor_sampling_interval_ms_,
*thread_manager_);
});
}
}
void SensorCollector::StartCollection() {
for (const auto& io_context : thread_manager_->io_contexts) {
thread_manager_->threads.push_back(
thread_factory_->New([io_context]() { io_context->run(); }));
}
}
std::shared_ptr<const Sensor> SensorCollector::GetSensorByConfigKeyAndSensorKey(
const std::string& board_config_key, const std::string& sensor_key) const {
auto board_it = sensor_table_.find(board_config_key);
if (board_it == sensor_table_.end()) {
return nullptr;
}
auto sensor_it = board_it->second.find(sensor_key);
if (sensor_it == board_it->second.end()) {
return nullptr;
}
return sensor_it->second;
}
std::vector<std::shared_ptr<const Sensor>> SensorCollector::GetAllSensors()
const {
std::vector<std::shared_ptr<const Sensor>> all_sensors;
for (const auto& [_, key_to_sensor] : sensor_table_) {
for (const auto& [_, sensor] : key_to_sensor) {
all_sensors.push_back(sensor);
}
}
std::sort(all_sensors.begin(), all_sensors.end(),
[](const std::shared_ptr<const Sensor>& a,
const std::shared_ptr<const Sensor>& b) {
return a->GetKey() < b->GetKey();
});
return all_sensors;
}
std::shared_ptr<const Sensor> SensorCollector::GetSensorBySensorKey(
const std::string& sensor_key) const {
for (const auto& [_, key_to_sensor] : sensor_table_) {
auto it = key_to_sensor.find(sensor_key);
if (it != key_to_sensor.end()) {
return it->second;
}
}
return nullptr;
}
absl::Status SensorCollector::WriteToSensor(const std::string& sensor_key,
const SensorValue& value) {
for (const auto& [_, key_to_sensor] : sensor_table_) {
auto it = key_to_sensor.find(sensor_key);
if (it != key_to_sensor.end()) {
return it->second->WriteReading(value);
}
}
return absl::NotFoundError(absl::StrCat("Sensor not found: ", sensor_key));
}
absl::Status SensorCollector::ConfigureCollection(const Config& config) const {
auto get_sampling_interval = [](const std::shared_ptr<Sensor>& sensor,
const Config& config) {
if (config.sampling_interval_ms > 0) {
return absl::Milliseconds(config.sampling_interval_ms);
}
// If the sampling interval is not set in the config, then we use the static
// refresh interval of the sensor if it is set. Otherwise, we use the
// default sensor sampling interval.
absl::Duration static_refresh_interval = DecodeGoogleApiProto(
sensor->GetSensorAttributesStatic().static_refresh_interval());
if (static_refresh_interval > absl::ZeroDuration()) {
LOG(WARNING) << "Using static refresh interval: "
<< static_refresh_interval
<< " for sensor: " << sensor->GetKey();
return static_refresh_interval;
}
LOG(WARNING) << "Using default sensor sampling interval: "
<< kDefaultSensorSamplingInterval
<< " for sensor: " << sensor->GetKey();
return kDefaultSensorSamplingInterval;
};
bool any_sensor_configured = false;
for (auto& [_, key_to_sensor] : sensor_table_) {
// If the key is empty, then we need to configure all the sensors in the
// board config.
if (config.key.empty()) {
for (auto& [key, sensor] : key_to_sensor) {
if (sensor->GetSensorAttributesDynamic().state().status() ==
STATUS_CREATION_PENDING) {
LOG(WARNING) << "Skipping configuration for sensor " << key
<< " because it is pending creation.";
continue;
}
absl::Duration interval = get_sampling_interval(sensor, config);
ResizeBufferAndMetrics(sensor, config.max_batch_size);
// It's assumed sensor_key_to_task_id always contains the key if it's in
// sensor_table_ and does not have a pending creation status.
{
absl::MutexLock lock(&thread_manager_->key_to_task_id_mutex);
thread_manager_->task_scheduler->UpdateTaskPeriod(
thread_manager_->sensor_key_to_task_id.at(key), interval);
}
sensor->UpdateSensorSamplingInterval(interval);
any_sensor_configured = true;
}
}
// If the key is not empty, then we need to configure the sensor with the
// given key.
auto it = key_to_sensor.find(config.key);
if (it != key_to_sensor.end()) {
if (it->second->GetSensorAttributesDynamic().state().status() ==
STATUS_CREATION_PENDING) {
LOG(WARNING) << "Skipping configuration for sensor " << config.key
<< " because it is pending creation.";
return absl::OkStatus();
}
absl::Duration interval = get_sampling_interval(it->second, config);
{
absl::MutexLock lock(&thread_manager_->key_to_task_id_mutex);
thread_manager_->task_scheduler->UpdateTaskPeriod(
thread_manager_->sensor_key_to_task_id.at(config.key), interval);
}
ResizeBufferAndMetrics(it->second, config.max_batch_size);
it->second->UpdateSensorSamplingInterval(interval);
any_sensor_configured = true;
break;
}
}
return any_sensor_configured
? absl::OkStatus()
: absl::NotFoundError("No sensor found for configuration");
}
void SensorCollector::ReinitializeSensorByConfigKeyAndSensorKey(
const std::string& board_config_key, const std::string& sensor_key) const {
auto board_it = sensor_table_.find(board_config_key);
if (board_it == sensor_table_.end()) {
LOG(WARNING) << "No sensors found for config key: " << board_config_key
<< ". Skipping reinitialization.";
return;
}
auto sensor_it = board_it->second.find(sensor_key);
if (sensor_it == board_it->second.end()) {
LOG(WARNING) << "Failed to find sensor " << sensor_key << " in config "
<< board_config_key;
return;
}
sensor_it->second->Reinitialize(
[this, sensor_it, sensor_key](absl::Status status) {
if (!status.ok()) {
LOG(WARNING) << "Failed to reinitialize sensor: " << sensor_key
<< " with status: " << status;
return;
}
ScheduleIndividualSensorRead(sensor_it->second, refresh_notification_,
override_sensor_sampling_interval_ms_,
*thread_manager_);
});
}
absl::StatusOr<std::unique_ptr<SensorCollector>> SensorCollector::Create(
const Params& params) {
std::vector<std::shared_ptr<Sensor>> sensors;
auto thread_manager = std::make_unique<ThreadManager>(params.clock);
absl::flat_hash_map<std::string, std::shared_ptr<boost::asio::io_context>>
sensor_group_to_io_context;
ECCLESIA_RETURN_IF_ERROR(
CreateHwmonSensors(params, sensors, sensor_group_to_io_context));
ECCLESIA_RETURN_IF_ERROR(
CreatePsuSensors(params, sensors, sensor_group_to_io_context));
ECCLESIA_ASSIGN_OR_RETURN(
std::vector<std::shared_ptr<FanController>> fan_controllers,
CreateFanControllers(params));
ECCLESIA_RETURN_IF_ERROR(CreateFanSensors(params, fan_controllers, sensors,
sensor_group_to_io_context));
ECCLESIA_RETURN_IF_ERROR(
CreateSharedMemSensors(params, sensors, *thread_manager));
ECCLESIA_RETURN_IF_ERROR(
CreateVirtualSensors(params, sensors, sensor_group_to_io_context));
ECCLESIA_RETURN_IF_ERROR(
CreateIntelCpuSensors(params, sensors, sensor_group_to_io_context));
absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>
bus_to_v1_accessor;
absl::flat_hash_map<int, std::unique_ptr<nic_veeprom::Accessor>>
bus_to_v2_accessor;
ECCLESIA_RETURN_IF_ERROR(
CreateNicSensors(params, sensors, sensor_group_to_io_context,
bus_to_v1_accessor, bus_to_v2_accessor));
ScheduleNicSensorAccessorRefresh(bus_to_v1_accessor, bus_to_v2_accessor,
sensor_group_to_io_context, *thread_manager);
for (const auto& [sensor_group, io_context] : sensor_group_to_io_context) {
LOG(INFO) << "Starting io_context thread for sensor group: "
<< sensor_group;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
work_guard(boost::asio::make_work_guard(*io_context));
thread_manager->work_guards.push_back(std::move(work_guard));
thread_manager->io_contexts.push_back(io_context);
}
ScheduleAllSensorReads(sensors, params.refresh_notification,
params.override_sensor_sampling_interval_ms,
*thread_manager);
// Create the SensorCollector instance first.
auto collector = absl::WrapUnique(new SensorCollector(
std::move(sensors), std::move(bus_to_v1_accessor),
std::move(bus_to_v2_accessor), std::move(thread_manager),
params.refresh_notification, params.override_sensor_sampling_interval_ms,
params.thread_factory,
/*peci_scanner=*/nullptr));
if (!params.sensor_configs.intel_cpu_sensor_configs.empty()) {
// Initialize the PeciScanner after the SensorCollector is created,
// so the callback can capture 'this'.
collector->InitializePeciScanner(params);
collector->peci_scanner_->StartScan();
}
return collector;
}
void SensorCollector::InitializePeciScanner(const Params& params) {
peci_scanner_ = PeciScanner::Create(
params.sensor_configs.intel_cpu_sensor_configs, params.peci_sysfs,
params.peci_access, thread_manager_->task_scheduler.get(),
[this](const std::string& board_config_key,
const std::string& sensor_key) {
this->ReinitializeSensorByConfigKeyAndSensorKey(board_config_key,
sensor_key);
});
}
std::unique_ptr<EmptySensorCollector> EmptySensorCollector::Create() {
return std::make_unique<EmptySensorCollector>();
}
// Returns the sorted list of sensor names contained by the given config key.
std::vector<std::string> EmptySensorCollector::GetAllSensorKeysByConfigKey(
const std::string& board_config_key) const {
LOG(WARNING) << "EmptySensorCollector::GetAllSensorKeysByConfigKey is "
"called. This will return an empty list.";
return {};
}
// Returns all the sensors sorted by sensor name.
std::vector<std::shared_ptr<const Sensor>> EmptySensorCollector::GetAllSensors()
const {
LOG(WARNING) << "EmptySensorCollector::GetAllSensorKeysByConfigKey is "
"called. This will return an empty list.";
return {};
}
// Returns the sensor for the given sensor key.
std::shared_ptr<const Sensor> EmptySensorCollector::GetSensorBySensorKey(
const std::string& sensor_key) const {
LOG(WARNING) << "EmptySensorCollector::GetSensorBySensorKey is "
"called. This will return a nullptr.";
return nullptr;
}
absl::Status EmptySensorCollector::ConfigureCollection(
const Config& config) const {
return absl::UnimplementedError(
"EmptySensorCollector::ConfigureCollection is called. This is not "
"implemented.");
}
std::shared_ptr<const Sensor>
EmptySensorCollector::GetSensorByConfigKeyAndSensorKey(
const std::string& board_config_key, const std::string& sensor_key) const {
LOG(WARNING) << "EmptySensorCollector::GetSensorByConfigKeyAndSensorKey is "
"called. This will return a nullptr.";
return nullptr;
}
absl::Status EmptySensorCollector::WriteToSensor(const std::string& sensor_key,
const SensorValue& value) {
LOG(WARNING) << "EmptySensorCollector::WriteToSensor is "
"called. This will return an error.";
return absl::UnimplementedError(
"EmptySensorCollector::WriteToSensor is called. This is not "
"implemented.");
}
nlohmann::json EmptySensorCollector::GetSchedulerStats() const {
return nlohmann::json::parse("{\"Warning\": \"EmptySensorCollector used.\"}");
}
nlohmann::json EmptySensorCollector::ToJson() const {
return nlohmann::json::parse("{\"Warning\": \"EmptySensorCollector used.\"}");
}
void EmptySensorCollector::StartCollection() {
LOG(WARNING) << "EmptySensorCollector::StartCollection is "
"called. This does nothing.";
}
} // namespace milotic_tlbmc