blob: e17400adb7156613e929287ad26e129bad9b77ca [file] [edit]
#include "tlbmc/collector/sel_collector.h"
#include <cstddef>
#include <cstdint>
#include <filesystem> // NOLINT
#include <fstream>
#include <ios>
#include <limits>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <system_error> // NOLINT
#include <utility>
#include <vector>
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "g3/macros.h"
#include "gbmc_sel_sub.h"
#include "nlohmann/json.hpp"
#include "sel_config.pb.h"
namespace milotic_tlbmc {
namespace {
constexpr absl::string_view kSelFilePrefix = "sel_";
constexpr absl::string_view kSelFileSuffix = ".jsonl";
struct ParsedLogEntry {
uint64_t id = 0;
nlohmann::json json_event;
};
// Parses a JSON log entry from a string. Returns an error if the line is not a
// valid JSON event or if the event does not contain an ID.
absl::StatusOr<ParsedLogEntry> ParseLogEntry(absl::string_view line) {
// Parse the line as a JSON event.
nlohmann::json json_event = nlohmann::json::parse(line, nullptr, false);
if (json_event.is_discarded() || !json_event.contains("Id")) {
return absl::InvalidArgumentError("Invalid log entry.");
}
uint64_t id = 0;
if (!json_event["Id"].is_string()) {
return absl::InvalidArgumentError("Log entry ID must be a string.");
}
std::string id_str = json_event["Id"].get<std::string>();
if (!absl::SimpleAtoi(id_str, &id)) {
return absl::InvalidArgumentError("Invalid log entry ID.");
}
return ParsedLogEntry{
.id = id,
.json_event = std::move(json_event),
};
}
} // namespace
absl::StatusOr<std::unique_ptr<SelCollector>> SelCollector::Create(
const Params& params) {
std::unique_ptr<gbmc_sel_framework::GbmcSelSubscriber> subscriber =
gbmc_sel_framework::GbmcSelSubscriber::Create({}, /*skip_history=*/false,
/*current_boot_only=*/true);
if (!subscriber) {
return absl::InternalError("Failed to create SEL subscriber.");
}
return Create(params, std::move(subscriber));
}
absl::StatusOr<std::unique_ptr<SelCollector>> SelCollector::Create(
const Params& params,
std::unique_ptr<gbmc_sel_framework::GbmcSelSubscriber> subscriber) {
if (params.config.max_sel_file() <= 0) {
return absl::InvalidArgumentError("max_sel_file must be positive.");
}
if (params.config.chunk_size() <= 0) {
return absl::InvalidArgumentError("chunk_size must be positive.");
}
if (params.config.sel_save_path().empty()) {
return absl::InvalidArgumentError("sel_save_path must be non-empty.");
}
// Check if the path exists and is a directory.
std::filesystem::path sel_save_path(params.config.sel_save_path());
std::error_code ec;
if (!std::filesystem::exists(sel_save_path, ec)) {
if (ec) {
return absl::InternalError(absl::StrCat(
"Failed to check if SEL save path exists: ", sel_save_path.string()));
}
std::filesystem::create_directory(sel_save_path, ec);
if (ec) {
return absl::InternalError(absl::StrCat(
"Failed to create SEL save path: ", sel_save_path.string()));
}
}
if (!std::filesystem::is_directory(sel_save_path, ec)) {
if (ec) {
return absl::InternalError(
absl::StrCat("Failed to check if SEL save path is a directory: ",
sel_save_path.string()));
}
return absl::FailedPreconditionError(absl::StrCat(
"SEL save path is not a directory: ", sel_save_path.string()));
}
if (subscriber == nullptr) {
return absl::InvalidArgumentError("Subscriber must be non-null.");
}
auto thread_manager = std::make_unique<SelThreadManager>(params.clock);
std::unique_ptr<SelCollector> collector = absl::WrapUnique(new SelCollector(
params.config, std::move(thread_manager), std::move(subscriber)));
ECCLESIA_RETURN_IF_ERROR(collector->RecoverExistingSelFiles());
return collector;
}
SelCollector::SelCollector(
const SelConfig& config, std::unique_ptr<SelThreadManager> thread_manager,
std::unique_ptr<gbmc_sel_framework::GbmcSelSubscriber> subscriber)
: config_(config),
thread_manager_(std::move(thread_manager)),
default_subscriber_(std::move(subscriber)) {}
SelCollector::~SelCollector() { Stop(); }
void SelCollector::Stop() {
if (default_subscriber_) {
default_subscriber_->Stop();
}
if (thread_manager_) {
thread_manager_->task_scheduler->Stop();
}
}
absl::Status SelCollector::StartDefaultSubscriber() {
if (default_subscriber_ == nullptr) {
return absl::InternalError("No default subscriber to start.");
}
thread_manager_->task_ids.push_back(
thread_manager_->task_scheduler->ScheduleAsync(
[this](absl::AnyInvocable<void()> on_done) {
DefaultSubscriberTask();
on_done();
},
absl::Microseconds(100), absl::InfiniteDuration()));
return absl::OkStatus();
}
void SelCollector::DefaultSubscriberTask() {
if (std::optional<absl::flat_hash_map<std::string, std::string>> event =
default_subscriber_->Next();
event != std::nullopt) {
WriteSelToFile(*event);
// Process all events in the journal, Next(0) will not block if there are
// no more events in the journal.
while (std::optional<absl::flat_hash_map<std::string, std::string>> event =
default_subscriber_->Next(0)) {
WriteSelToFile(*event);
}
// Flush the file to ensure that the events are written to the file.
current_sel_file_.flush();
}
}
void SelCollector::WriteSelToFile(
const absl::flat_hash_map<std::string, std::string>& event) {
if (current_sel_file_count_ >= config_.chunk_size() ||
!current_sel_file_.is_open()) {
RotateSelFile();
}
// convert to json
nlohmann::json json_event = event;
// Add the SEL ID to the json event.
json_event["Id"] =
std::to_string(current_sel_file_start_id_ + current_sel_file_count_);
current_sel_file_count_++;
current_sel_file_ << json_event.dump() << "\n";
}
void SelCollector::RotateSelFile() {
if (current_sel_file_.is_open()) {
current_sel_file_.close();
}
if (current_sel_file_count_ > 0) {
current_sel_file_start_id_ += current_sel_file_count_;
}
{
absl::MutexLock lock(mutex_);
while (existing_sel_files_.size() >= config_.max_sel_file()) {
// Remove the oldest SEL file.
absl::btree_map<uint64_t, std::string>::iterator it =
existing_sel_files_.begin();
std::error_code ec;
// Remove the oldest SEL file from the filesystem.
std::filesystem::remove(it->second, ec);
if (ec) {
LOG(ERROR) << "Failed to remove SEL file: " << it->second;
}
existing_sel_files_.erase(it);
}
}
std::string sel_file_path = absl::Substitute(
"$0/sel_$1.jsonl", config_.sel_save_path(), current_sel_file_start_id_);
current_sel_file_.open(sel_file_path, std::ios::out);
if (!current_sel_file_.is_open()) {
LOG(ERROR) << "Failed to open SEL file: " << sel_file_path;
return;
}
current_sel_file_count_ = 0;
{
absl::MutexLock lock(mutex_);
existing_sel_files_[current_sel_file_start_id_] = sel_file_path;
}
}
absl::Status SelCollector::RecoverExistingSelFiles() {
uint64_t max_id = 0;
bool found_any_sel_file = false;
std::string last_file_path;
std::error_code ec;
auto it = std::filesystem::directory_iterator(config_.sel_save_path(), ec);
if (ec) {
return absl::InternalError(absl::StrCat(
"Failed to iterate over SEL save path: ", config_.sel_save_path()));
}
auto end = std::filesystem::directory_iterator();
for (; it != end; it.increment(ec)) {
const std::filesystem::directory_entry& entry = *it;
std::error_code reg_ec;
bool is_reg = entry.is_regular_file(reg_ec);
if (reg_ec) {
LOG(ERROR) << "Error checking is_regular_file for path '"
<< entry.path().string() << "': " << reg_ec.message();
continue;
}
if (!is_reg) {
continue;
}
std::string file_name = entry.path().filename().string();
if (file_name.size() <= kSelFilePrefix.size() + kSelFileSuffix.size() ||
!absl::StartsWith(file_name, kSelFilePrefix) ||
!absl::EndsWith(file_name, kSelFileSuffix)) {
continue;
}
// Extract the ID from the file name.
absl::string_view id_str = file_name;
id_str.remove_prefix(kSelFilePrefix.size());
id_str.remove_suffix(kSelFileSuffix.size());
uint64_t id = 0;
if (!absl::SimpleAtoi(id_str, &id)) {
LOG(ERROR) << "Failed to parse SEL file ID: " << file_name;
continue;
}
{
absl::MutexLock lock(mutex_);
existing_sel_files_[id] = entry.path().string();
}
found_any_sel_file = true;
if (last_file_path.empty() || id > max_id) {
max_id = id;
last_file_path = entry.path().string();
}
}
if (ec) {
return absl::InternalError(absl::StrCat(
"Failed to iterate over SEL save path: ", config_.sel_save_path()));
}
if (found_any_sel_file) {
uint64_t num_sel_events = 0;
std::ifstream file(last_file_path);
if (file.is_open()) {
std::string line;
while (std::getline(file, line)) {
num_sel_events++;
}
current_sel_file_start_id_ = max_id + num_sel_events;
} else {
return absl::InternalError(
absl::StrCat("Failed to open last SEL file: ", last_file_path));
}
} else {
current_sel_file_start_id_ = 0;
}
return absl::OkStatus();
}
std::vector<nlohmann::json> SelCollector::GetSelEntriesForward(
uint64_t start_id, size_t max_count) const {
std::vector<nlohmann::json> results;
if (max_count == 0) {
return results;
}
// Get the list of file paths to read from.
std::vector<std::string> file_paths;
{
absl::MutexLock lock(mutex_);
if (existing_sel_files_.empty()) {
return results;
}
// Find the file that most likely contains the start_id (i.e., the file with
// the largest starting ID that is <= start_id). If start_id is older than
// all existing files, it defaults to the oldest available file.
auto it = existing_sel_files_.upper_bound(start_id);
if (it != existing_sel_files_.begin()) {
--it;
}
// Add all files from the starting file to the newest file.
while (it != existing_sel_files_.end()) {
file_paths.push_back(it->second);
++it;
}
}
for (const std::string& file_path : file_paths) {
if (results.size() >= max_count) break;
std::ifstream file(file_path);
if (!file.is_open()) {
LOG(WARNING) << "Failed to open SEL file: " << file_path;
continue;
}
// Read the file line by line, and parse each line as a JSON event.
// Skip any invalid JSON events or events that do not contain an ID.
// Only add events that have an ID greater than or equal to start_id.
std::string line;
while (std::getline(file, line) && results.size() < max_count) {
absl::StatusOr<ParsedLogEntry> parsed_log_entry = ParseLogEntry(line);
if (parsed_log_entry.ok() && parsed_log_entry->id >= start_id) {
results.push_back(std::move(parsed_log_entry->json_event));
}
}
}
return results;
}
std::vector<nlohmann::json> SelCollector::GetSelEntriesBackward(
uint64_t start_id, size_t max_count) const {
std::vector<nlohmann::json> results;
if (max_count == 0) {
return results;
}
std::vector<std::string> file_paths;
{
absl::MutexLock lock(mutex_);
if (existing_sel_files_.empty()) {
return results;
}
auto it = existing_sel_files_.upper_bound(start_id);
if (it != existing_sel_files_.begin()) {
--it;
}
// Add all files from the starting file to the oldest file.
while (true) {
file_paths.push_back(it->second);
if (it == existing_sel_files_.begin()) {
break;
}
--it;
}
}
for (const std::string& file_path : file_paths) {
if (results.size() >= max_count) break;
std::ifstream file(file_path);
if (!file.is_open()) {
LOG(WARNING) << "Failed to open SEL file: " << file_path;
continue;
}
// Read the whole file line by line, then iterate in reverse order.
// Only add events that have an ID less than or equal to start_id.
std::vector<std::string> file_lines;
std::string line;
while (std::getline(file, line)) {
file_lines.push_back(std::move(line));
}
for (auto file_it = file_lines.rbegin();
file_it != file_lines.rend() && results.size() < max_count;
++file_it) {
absl::StatusOr<ParsedLogEntry> parsed_log_entry = ParseLogEntry(*file_it);
if (parsed_log_entry.ok() && parsed_log_entry->id <= start_id) {
results.push_back(std::move(parsed_log_entry->json_event));
}
}
}
return results;
}
std::vector<nlohmann::json> SelCollector::GetLatestSelEntries(
size_t max_count) const {
return GetSelEntriesBackward(std::numeric_limits<uint64_t>::max(), max_count);
}
void SelCollector::DumpSelEntriesRaw(uint64_t start_id,
std::ostream& os) const {
std::vector<std::string> file_paths;
{
absl::MutexLock lock(mutex_);
if (existing_sel_files_.empty()) {
return;
}
auto it = existing_sel_files_.upper_bound(start_id);
if (it != existing_sel_files_.begin()) {
--it;
}
while (it != existing_sel_files_.end()) {
file_paths.push_back(it->second);
++it;
}
}
// Phase 1: "Catch-Up" on the very first file.
// We must read line-by-line because it might contain IDs < start_id.
std::ifstream first_file(file_paths[0]);
if (first_file.is_open()) {
std::string line;
while (std::getline(first_file, line)) {
nlohmann::json json_event = nlohmann::json::parse(line, nullptr, false);
if (json_event.is_discarded() || !json_event.contains("Id")) {
continue;
}
uint64_t id = 0;
std::string id_str = json_event["Id"].get<std::string>();
if (absl::SimpleAtoi(id_str, &id) && id >= start_id) {
os << line << "\n";
os << first_file.rdbuf();
// We've dumped all relevant entries from this file, so we can exit
// early.
break;
}
}
} else {
LOG(WARNING) << "Failed to open SEL file: " << file_paths[0];
}
// Phase 2: "Bulk Dump" on all remaining files.
// Every log in these files is guaranteed to be > start_id, so we stream the
// raw buffer.
for (size_t i = 1; i < file_paths.size(); ++i) {
std::ifstream file(file_paths[i]);
if (!file.is_open()) {
LOG(WARNING) << "Failed to open SEL file: " << file_paths[i];
continue;
}
os << file.rdbuf();
}
}
nlohmann::json SelCollector::ToJson() const {
return nlohmann::json::parse("{\"Warning\": \"EmptySelCollector used.\"}");
}
nlohmann::json SelCollector::GetSchedulerStats() const {
return thread_manager_->task_scheduler->ToJson();
}
std::unique_ptr<EmptySelCollector> EmptySelCollector::Create() {
return std::make_unique<EmptySelCollector>();
}
absl::Status EmptySelCollector::StartDefaultSubscriber() {
return absl::OkStatus();
}
std::vector<nlohmann::json> EmptySelCollector::GetSelEntriesForward(
uint64_t start_id, size_t max_count) const {
return std::vector<nlohmann::json>();
}
std::vector<nlohmann::json> EmptySelCollector::GetSelEntriesBackward(
uint64_t start_id, size_t max_count) const {
return std::vector<nlohmann::json>();
}
std::vector<nlohmann::json> EmptySelCollector::GetLatestSelEntries(
size_t max_count) const {
return std::vector<nlohmann::json>();
}
void EmptySelCollector::DumpSelEntriesRaw(uint64_t start_id,
std::ostream& os) const {
// Empty collector, do nothing
}
nlohmann::json EmptySelCollector::ToJson() const {
return nlohmann::json::parse("{\"Warning\": \"EmptySelCollector used.\"}");
}
nlohmann::json EmptySelCollector::GetSchedulerStats() const {
return nlohmann::json::parse("{\"Warning\": \"EmptySelCollector used.\"}");
}
} // namespace milotic_tlbmc