blob: cad120147abcc9c8fa0aeef1c157b211dfcc8895 [file] [log] [blame] [edit]
#include "cper/manager.h"
#include <sys/stat.h>
#include <cerrno>
#include <cstdio>
#include <cstring>
#include <fstream>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/functional/bind_front.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "nlohmann/json_fwd.hpp"
#include "cper/cper_event.h"
#include "proxy.h"
#include "proxy_config.pb.h"
#include "redfish_resource.h"
#include "sse_plugin/event.h"
#include "sse_plugin/events_manager.h"
#include "vendor_events.pb.h"
#include "utils/std_thread.h"
#include "utils/status_macros.h"
#include "utils/thread.h"
#include "voyager/priority_queue.hpp"
namespace milotic {
namespace {
constexpr int kVlogVerbosity = 1;
constexpr absl::string_view kSystemsUri = "/redfish/v1/Systems";
constexpr absl::string_view kDefaultPath = "/run/grace-cpu";
constexpr absl::string_view kDefaultFilename = "error-count.json";
constexpr absl::string_view kDefaultMemoryErrorFieldName =
"IndeterminateUncorrectableErrorCount";
constexpr absl::string_view kDefaultCpuErrorFieldName =
"UncorrectableOtherErrorCount";
} // namespace
// Processes event from RMC and persists it and log (if not OK).
// Updates the machine state based on the event severity.
// If the plugin is starting, also restarts init_wait_timer_.
absl::Status CperEventsManager::ProcessEvent(voyager::Job& job) {
Event& event = static_cast<EventJob&>(job).event();
VLOG(kVlogVerbosity) << "ProcessEvent id: " << event.event_id();
if (event.cper_event_data() && event.cper_event_data()->memory_errors > 0) {
RemapMemoryOrigin(*event.cper_event_data());
}
machine_state_->UpdateStatus(event, /*is_replay=*/false);
// SaveCperEvent is called for CPER events.
if (event.is_cper_event()) {
if (absl::Status status = storage_manager_->SaveCperEvent(event);
!status.ok()) {
if (absl::IsCancelled(status)) {
LOG(ERROR)
<< "Got cancelled status from storage manager, disconnecting: "
<< status;
storage_manager_->Disconnect();
return absl::OkStatus();
}
LOG(ERROR) << "SaveCperEvent failed: " << status;
}
}
// SaveEvent is called for all events.
if (absl::Status status = storage_manager_->SaveEvent(event); !status.ok()) {
if (absl::IsCancelled(status)) {
LOG(ERROR) << "Got cancelled status from storage manager, disconnecting";
storage_manager_->Disconnect();
return absl::OkStatus();
}
LOG(ERROR) << "SaveEvent failed: " << status;
}
return absl::OkStatus();
}
absl::StatusOr<std::vector<Event>> CperEventsManager::ParseSSEvent(
absl::string_view sse_json, absl::string_view sse_id) {
return CperEvent::ParseRedfishEvent(sse_json);
}
absl::Status CperEventsManager::ProcessInitExpiry(voyager::Job& job) {
VLOG(kVlogVerbosity) << "ProcessInitExpiry";
SetState(PluginState::kServing, PluginState::kStreamStarted);
serving_state_timer_.End();
machine_state_->MarkHealthy();
return absl::OkStatus();
}
static absl::StatusOr<std::string> GetStringField(
const nlohmann::json& resource, absl::string_view field) {
auto it = resource.find(field);
if (it == resource.end()) {
return absl::NotFoundError(absl::StrCat("Field not found: ", field));
}
const auto* value = it->get_ptr<const std::string*>();
if (value == nullptr) {
return absl::InvalidArgumentError("Value is not a string");
}
return *value;
}
static absl::StatusOr<std::string> GetPartLocationContext(
absl::string_view odata_id, const nlohmann::json& resource) {
auto location = resource.find("Location");
if (location == resource.end()) {
return absl::NotFoundError("Location not found");
}
return GetStringField(*location, "PartLocationContext");
}
static bool UpdateMappingFromLinks(
absl::string_view odata_id,
absl::flat_hash_map<std::string, std::string>& memory_origin_remapping,
const nlohmann::json& memory) {
auto links = memory.find("Links");
if (links == memory.end()) {
LOG(WARNING) << "No links found in memory resource " << odata_id;
return false;
}
auto processors = links->find("Processors");
if (processors == links->end()) {
LOG(WARNING) << "No processors found in links in " << odata_id;
return false;
}
if (processors->empty()) {
LOG(WARNING) << "Empty processors list found in links in " << odata_id;
return false;
}
for (const auto& processor : *processors) {
absl::StatusOr<std::string> processor_id =
GetStringField(processor, "@odata.id");
if (!processor_id.ok()) {
LOG(ERROR) << "Could not parse processor link in " << odata_id << ": "
<< processor_id.status() << ":" << processor.dump(2);
continue;
}
auto [it, inserted] =
memory_origin_remapping.try_emplace(*processor_id, odata_id);
if (!inserted) {
LOG(ERROR) << odata_id << ": Memory origin remapping already exists for "
<< *processor_id << " to " << it->second;
}
}
return true;
}
static void UpdatePartLocationContextMap(
absl::flat_hash_map<std::string, std::string>& memory_origin_remapping,
absl::flat_hash_map<std::string, std::string>& plc_to_memory,
const nlohmann::json& memory) {
absl::StatusOr<std::string> odata_id = GetStringField(memory, "@odata.id");
if (!odata_id.ok()) {
LOG(ERROR) << "Coud not parse memory resource: " << odata_id.status() << ":"
<< memory.dump(2);
return;
}
// If the memory resource has links, we can use this directly.
if (UpdateMappingFromLinks(*odata_id, memory_origin_remapping, memory)) {
return;
}
absl::StatusOr<std::string> plc = GetPartLocationContext(*odata_id, memory);
if (!plc.ok()) {
LOG(WARNING) << "Skipping memory resource: " << plc.status();
return;
}
constexpr absl::string_view kCpuSuffix = "/CPU_0";
if (plc->ends_with(kCpuSuffix)) {
plc->erase(plc->size() - kCpuSuffix.size());
} else {
LOG(WARNING) << "PartLocationContext does not end with " << kCpuSuffix
<< ": " << *plc;
}
plc_to_memory[*plc] = *odata_id;
}
static void UpdateMemoryOriginRemapping(
absl::flat_hash_map<std::string, std::string>& memory_origin_remapping,
absl::flat_hash_map<std::string, std::string>& plc_to_memory,
const nlohmann::json& processor) {
absl::StatusOr<std::string> odata_id = GetStringField(processor, "@odata.id");
if (!odata_id.ok()) {
LOG(ERROR) << "Could not parse processor resource: " << odata_id.status()
<< ":" << processor.dump();
return;
}
absl::StatusOr<std::string> plc =
GetPartLocationContext(*odata_id, processor);
if (!plc.ok()) {
LOG(WARNING) << "Skipping processor resource: " << plc.status();
return;
}
auto it = plc_to_memory.find(*plc);
if (it == plc_to_memory.end()) {
LOG(WARNING) << "No memory with PartLocationContext starting with " << *plc;
return;
}
LOG(INFO) << "Remapping memory origin " << *odata_id << " to " << it->second;
// We are assuming there is only one memory per Cpu. This is true for
// Stellaris. As a result, we can safely move the value from the map and erase
// it. If there are multiple memories per CPU, we will return with a warning
// in the previous if statement.
memory_origin_remapping[*odata_id] = std::move(it->second);
plc_to_memory.erase(it);
}
static void IterateCollection(
const nlohmann::json& collection,
absl::AnyInvocable<void(const nlohmann::json&)> callback) {
auto members = collection.find("Members");
if (members == collection.end()) {
LOG(ERROR) << "No members found in collection: " << collection.dump(2);
return;
}
for (const auto& item : *members) {
callback(item);
}
}
absl::Status CperEventsManager::InitRemapping(const nlohmann::json& systems) {
// Maps PartLocationContext to memory odata id.
absl::flat_hash_map<std::string, std::string> plc_to_memory;
auto systems_members = systems.find("Members");
if (systems_members == systems.end()) {
return absl::NotFoundError("No members found in systems");
}
// First, we need to get the mapping from PartLocationContext to memory
// odata id.
for (const auto& system : *systems_members) {
ASSIGN_OR_RETURN(std::string system_id,
GetStringField(system, "@odata.id"));
ASSIGN_OR_RETURN(
nlohmann::json memories,
GetRedfishResource(proxy_, absl::StrCat(system_id, "/Memory"),
{.expand_non_links = true}));
IterateCollection(memories,
absl::bind_front(UpdatePartLocationContextMap,
std::ref(memory_origin_remapping_),
std::ref(plc_to_memory)));
}
// Then, we need to use the processor PartLocationContext to look up the
// memory odata id from the previous map.
for (const auto& system : *systems_members) {
ASSIGN_OR_RETURN(std::string system_id,
GetStringField(system, "@odata.id"));
ASSIGN_OR_RETURN(
nlohmann::json processors,
GetRedfishResource(proxy_, absl::StrCat(system_id, "/Processors"),
{.expand_non_links = true}));
IterateCollection(processors,
absl::bind_front(UpdateMemoryOriginRemapping,
std::ref(memory_origin_remapping_),
std::ref(plc_to_memory)));
}
return absl::OkStatus();
}
CperEventsManager::~CperEventsManager() {
// Make sure there is no in-flight remapping
shutdown_.Notify();
remapping_ready_.WaitForNotification();
}
void CperEventsManager::GetSystemsForRemapping(absl::Duration delay) {
if (shutdown_.HasBeenNotified()) {
LOG(INFO) << "Shutdown, skipping memory origin remapping";
remapping_ready_.Notify();
return;
}
// This needs to be deferred to when the queue starts processing.
absl::Status status = GetRedfishResource(
proxy_, kSystemsUri,
[this](absl::StatusOr<nlohmann::json> systems) {
if (!systems.ok()) {
LOG(ERROR) << "Failed to get systems for remapping: "
<< systems.status();
GetSystemsForRemapping(remapping_retry_delay_);
return;
}
if (shutdown_.HasBeenNotified()) {
LOG(INFO) << "Shutdown, skipping memory origin remapping";
remapping_ready_.Notify();
return;
}
absl::Status status = InitRemapping(*std::move(systems));
if (!status.ok()) {
LOG(ERROR) << "Failed to update memory origin remapping: " << status;
GetSystemsForRemapping(remapping_retry_delay_);
return;
}
LOG(INFO) << "Memory origin remapping ready";
remapping_ready_.Notify();
},
/*expand=*/{}, delay);
if (!status.ok()) {
LOG(ERROR) << "Failed to schedule memory origin remapping: " << status;
remapping_ready_.Notify(); // Notify since it will never be populated.
return;
}
}
void CperEventsManager::SetProxy(Proxy* proxy) {
BaseEventsManager::SetProxy(proxy);
LOG(INFO) << "Getting memory origin remapping";
GetSystemsForRemapping();
}
void CperEventsManager::PullEventsFromRMC() {
// This is done since we want to clear the existing events before pulling new
// events. We don't need these events since machdocd has already processed the
// error counter based on these events.
LOG(INFO) << "Archiving processed events in storage manager";
absl::Status status = storage_manager_->ArchiveProcessedEvents();
if (!status.ok()) {
LOG(ERROR) << "Failed to archive processed events, status: " << status;
}
if (!remapping_ready_.WaitForNotificationWithTimeout(
remapping_ready_timeout_)) {
LOG(ERROR) << "Memory origin remapping not ready after "
<< remapping_ready_timeout_;
// This is best effort; in case of failure, we will just log an error and
// continue.
}
BaseEventsManager::PullEventsFromRMC();
}
void CperEventsManager::RemapMemoryOrigin(
Event::CperEventData& cper_event_data) const {
if (cper_event_data.origin.empty()) {
return;
}
LOG(INFO) << "Remapping memory origin " << cper_event_data.origin;
if (!remapping_ready_.HasBeenNotified()) {
// In case the mapping is not ready, we will just log an error and continue
// with the original origin so that the event is still reported quickly.
LOG(ERROR) << "Memory origin remapping not ready";
return;
}
auto it = memory_origin_remapping_.find(cper_event_data.origin);
if (it == memory_origin_remapping_.end()) {
LOG(WARNING) << "No origin remapping found for memory error: "
<< cper_event_data.origin;
return;
}
LOG(INFO) << "Remapping memory origin " << cper_event_data.origin << " to "
<< it->second;
cper_event_data.origin = it->second;
}
CperMachineState::CperMachineState(
const milotic_grpc_proxy::Plugin::CperEvents::CounterFile& config)
: memory_error_field_name_(kDefaultMemoryErrorFieldName),
cpu_error_field_name_(kDefaultCpuErrorFieldName) {
absl::string_view filename = kDefaultFilename;
absl::string_view path = kDefaultPath;
if (!config.path().empty()) {
path = config.path();
}
if (!config.filename().empty()) {
filename = config.filename();
}
counter_file_path_ = absl::StrCat(path, "/", filename);
if (config.temp_filename().empty()) {
// Default to the same filename with a .tmp suffix.
counter_file_temp_path_ = absl::StrCat(counter_file_path_, ".tmp");
} else {
counter_file_temp_path_ = absl::StrCat(path, "/", config.temp_filename());
}
if (!config.memory_error_field_name().empty()) {
memory_error_field_name_ = config.memory_error_field_name();
}
if (!config.cpu_error_field_name().empty()) {
cpu_error_field_name_ = config.cpu_error_field_name();
}
if (config.save_retry_interval_ms() > 0) {
save_retry_interval_ = absl::Milliseconds(config.save_retry_interval_ms());
}
LOG(INFO) << "Initializing counters in " << counter_file_path_;
if (!config.path_must_exist()) {
LOG(INFO) << "Creating directory: " << path;
if (mkdir(std::string(path).c_str(), 0755) != 0) {
if (errno == EEXIST) {
LOG(INFO) << "Directory already exists: " << path;
} else {
LOG(ERROR) << "Failed to create directory: " << strerror(errno);
}
}
}
// Counters are saved to a file in a separate thread. This allows the main
// thread to process events without blocking, and allows file updates to be
// batched together if needed.
save_counters_thread_ = std::make_unique<StdThread>(
"SaveCountersThread",
absl::bind_front(&CperMachineState::SaveCountersToFileThread, this));
absl::Status status = save_counters_thread_->Start();
if (!status.ok()) {
LOG(ERROR) << "Failed to start save counters thread, status: " << status;
}
}
CperMachineState::~CperMachineState() {
if (save_counters_thread_ != nullptr) {
{
absl::MutexLock lock(mutex_);
counter_state_ = CounterState::kShutdown;
}
save_counters_thread_->Join();
}
}
// Returns false if state is shutdown, true if sleep duration expired.
bool CperMachineState::InterruptibleSleep(absl::Duration duration) {
auto is_shutdown = [this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
return counter_state_ == CounterState::kShutdown;
};
absl::MutexLock lock(mutex_);
return !mutex_.AwaitWithTimeout(absl::Condition(&is_shutdown), duration);
}
absl::Status CperMachineState::UpdateCounters(
const Event::CperEventData& cper_event_data) {
if (cper_event_data.origin.empty()) {
return absl::InvalidArgumentError("No origin found");
}
absl::MutexLock lock(mutex_);
if (counter_state_ == CounterState::kShutdown) {
return absl::CancelledError("Shutting down counters thread");
}
if (cper_event_data.memory_errors > 0) {
memory_error_counters_[cper_event_data.origin] +=
cper_event_data.memory_errors;
}
if (cper_event_data.cpu_errors > 0) {
cpu_error_counters_[cper_event_data.origin] += cper_event_data.cpu_errors;
}
// This will trigger the counters thread to save the counters to file. This
// may happen multiple times in a row, in which case the counters thread will
// just save the latest values instead of writing multiple times in quick
// succession.
counter_state_ = CounterState::kPending;
return absl::OkStatus();
}
void CperMachineState::SaveCountersToFileThread() {
VLOG(kVlogVerbosity) << "SaveCountersToFileThread";
while (true) {
absl::StatusOr<nlohmann::json> contents = GetCountersFileContents();
if (absl::IsCancelled(contents.status())) {
LOG(INFO) << "Cancelled: " << contents.status();
return;
}
if (!contents.ok()) {
LOG(ERROR) << "Failed to get counters file contrents: "
<< contents.status();
if (!InterruptibleSleep(save_retry_interval_)) break;
continue;
}
absl::Status status = SaveToCountersFile(*contents);
if (!status.ok()) {
LOG(ERROR) << "Failed to save counters to file: " << status;
if (!InterruptibleSleep(save_retry_interval_)) break;
}
}
VLOG(kVlogVerbosity) << "SaveCountersToFileThread exit ";
}
absl::Status CperMachineState::SaveToCountersFile(
const nlohmann::json& contents) {
std::ofstream out(counter_file_temp_path_);
if (!out.is_open()) {
return absl::InternalError(
absl::StrCat("Failed to open counter file temp path for writing: ",
counter_file_temp_path_));
}
out << contents.dump();
out.close();
if (rename(counter_file_temp_path_.c_str(), counter_file_path_.c_str()) !=
0) {
return absl::InternalError(
absl::StrCat("Failed to rename counter file: ", counter_file_temp_path_,
" to ", counter_file_path_));
}
return absl::OkStatus();
}
absl::StatusOr<nlohmann::json> CperMachineState::GetCountersFileContents() {
// Wait until the counters are updated or the thread is shutting down, then
// format the counters into the json object to be saved.
auto not_saved = [this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
return counter_state_ != CounterState::kSaved;
};
absl::MutexLock lock(mutex_, absl::Condition(&not_saved));
if (counter_state_ == CounterState::kShutdown) {
return absl::CancelledError("Shutting down counters thread");
}
nlohmann::json contents = nlohmann::json::object();
for (const auto& [origin, count] : memory_error_counters_) {
contents[origin][memory_error_field_name_] = count > 0 ? 1 : 0;
}
for (const auto& [origin, count] : cpu_error_counters_) {
contents[origin][cpu_error_field_name_] = count > 0 ? 1 : 0;
}
counter_state_ = CounterState::kSaved;
return contents;
}
int CperMachineState::GetMemoryErrorCount(absl::string_view origin) const {
absl::MutexLock lock(mutex_);
auto it = memory_error_counters_.find(origin);
if (it == memory_error_counters_.end()) {
return 0;
}
return it->second;
}
int CperMachineState::GetCpuErrorCount(absl::string_view origin) const {
absl::MutexLock lock(mutex_);
auto it = cpu_error_counters_.find(origin);
if (it == cpu_error_counters_.end()) {
return 0;
}
return it->second;
}
// Updates machine state for event and returns true if health is changed.
bool CperMachineState::UpdateStatus(Event& event, bool is_replay) {
VLOG(kVlogVerbosity) << "UpdateStatus event id: " << event.event_id();
const Event& cache_event = event;
if (!is_replay && event.event_id() != last_event_id_ + 1) {
LOG(ERROR) << "LastEventId was " << last_event_id_
<< " new event received with id " << event.event_id();
}
if (!is_replay && event.event_id() > last_event_id_) {
last_event_id_ = event.event_id();
}
if (event.severity() == EventSeverity::kCritical &&
event.cper_event_data().has_value()) {
if (absl::Status status = UpdateCounters(*event.cper_event_data());
!status.ok()) {
LOG(ERROR) << "UpdateCounters failed: " << status;
}
}
timestamp_last_event_ = absl::Now();
MachineHealth prev_health = health();
MachineHealth new_health = prev_health;
switch (cache_event.severity()) {
case EventSeverity::kOk:
break;
case EventSeverity::kLog:
// Do nothing in case of kLog events.
break;
case EventSeverity::kWarning:
case EventSeverity::kEmergent:
case EventSeverity::kCritical:
// Only set health if new state is higher than previous state.
// i.e. we should not downgrade health state.
auto event_health = GetMachineHealth(cache_event.severity());
if (static_cast<int>(prev_health) < static_cast<int>(event_health)) {
new_health = event_health;
}
break;
}
// Update values on health transition.
if (new_health != prev_health) {
UpdateHealth(new_health);
timestamp_last_health_transition_ = timestamp_last_event_;
event_last_health_transition_ = cache_event;
}
return new_health != prev_health;
}
void CperMachineState::UpdateInternalEvent(absl::string_view event_message,
EventSeverity severity) {
if (severity == EventSeverity::kOk || severity == EventSeverity::kLog) {
return;
}
nlohmann::json json_event;
json_event["MessageSeverity"] = EventSeverityToString(severity);
json_event["EventId"] = "-1";
json_event["EventType"] = "vbmc.Internal";
json_event["Message"] = event_message;
if (severity == EventSeverity::kCritical) {
UpdateHealth(MachineHealth::kCritical);
}
if (severity == EventSeverity::kWarning) {
UpdateHealth(MachineHealth::kWarning);
}
timestamp_last_health_transition_ = absl::Now();
event_last_health_transition_ =
Event(json_event.dump(), /*event_id=*/-1, severity, /*reason_id=*/-1);
}
} // namespace milotic