| #include "tlbmc/collector/sel_collector.h" |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <filesystem> // NOLINT |
| #include <fstream> |
| #include <ios> |
| #include <limits> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <system_error> // NOLINT |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/container/btree_map.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/log.h" |
| #include "absl/memory/memory.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/match.h" |
| #include "absl/strings/numbers.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/strings/substitute.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "g3/macros.h" |
| #include "gbmc_sel_sub.h" |
| #include "nlohmann/json.hpp" |
| #include "sel_config.pb.h" |
| |
| namespace milotic_tlbmc { |
| |
| namespace { |
| constexpr absl::string_view kSelFilePrefix = "sel_"; |
| constexpr absl::string_view kSelFileSuffix = ".jsonl"; |
| |
| struct ParsedLogEntry { |
| uint64_t id = 0; |
| nlohmann::json json_event; |
| }; |
| |
| // Parses a JSON log entry from a string. Returns an error if the line is not a |
| // valid JSON event or if the event does not contain an ID. |
| absl::StatusOr<ParsedLogEntry> ParseLogEntry(absl::string_view line) { |
| // Parse the line as a JSON event. |
| nlohmann::json json_event = nlohmann::json::parse(line, nullptr, false); |
| if (json_event.is_discarded() || !json_event.contains("Id")) { |
| return absl::InvalidArgumentError("Invalid log entry."); |
| } |
| uint64_t id = 0; |
| if (!json_event["Id"].is_string()) { |
| return absl::InvalidArgumentError("Log entry ID must be a string."); |
| } |
| std::string id_str = json_event["Id"].get<std::string>(); |
| if (!absl::SimpleAtoi(id_str, &id)) { |
| return absl::InvalidArgumentError("Invalid log entry ID."); |
| } |
| return ParsedLogEntry{ |
| .id = id, |
| .json_event = std::move(json_event), |
| }; |
| } |
| |
| } // namespace |
| |
| absl::StatusOr<std::unique_ptr<SelCollector>> SelCollector::Create( |
| const Params& params) { |
| std::unique_ptr<gbmc_sel_framework::GbmcSelSubscriber> subscriber = |
| gbmc_sel_framework::GbmcSelSubscriber::Create({}, /*skip_history=*/false, |
| /*current_boot_only=*/true); |
| if (!subscriber) { |
| return absl::InternalError("Failed to create SEL subscriber."); |
| } |
| return Create(params, std::move(subscriber)); |
| } |
| |
| absl::StatusOr<std::unique_ptr<SelCollector>> SelCollector::Create( |
| const Params& params, |
| std::unique_ptr<gbmc_sel_framework::GbmcSelSubscriber> subscriber) { |
| if (params.config.max_sel_file() <= 0) { |
| return absl::InvalidArgumentError("max_sel_file must be positive."); |
| } |
| if (params.config.chunk_size() <= 0) { |
| return absl::InvalidArgumentError("chunk_size must be positive."); |
| } |
| if (params.config.sel_save_path().empty()) { |
| return absl::InvalidArgumentError("sel_save_path must be non-empty."); |
| } |
| // Check if the path exists and is a directory. |
| std::filesystem::path sel_save_path(params.config.sel_save_path()); |
| std::error_code ec; |
| if (!std::filesystem::exists(sel_save_path, ec)) { |
| if (ec) { |
| return absl::InternalError(absl::StrCat( |
| "Failed to check if SEL save path exists: ", sel_save_path.string())); |
| } |
| std::filesystem::create_directory(sel_save_path, ec); |
| if (ec) { |
| return absl::InternalError(absl::StrCat( |
| "Failed to create SEL save path: ", sel_save_path.string())); |
| } |
| } |
| if (!std::filesystem::is_directory(sel_save_path, ec)) { |
| if (ec) { |
| return absl::InternalError( |
| absl::StrCat("Failed to check if SEL save path is a directory: ", |
| sel_save_path.string())); |
| } |
| return absl::FailedPreconditionError(absl::StrCat( |
| "SEL save path is not a directory: ", sel_save_path.string())); |
| } |
| if (subscriber == nullptr) { |
| return absl::InvalidArgumentError("Subscriber must be non-null."); |
| } |
| auto thread_manager = std::make_unique<SelThreadManager>(params.clock); |
| std::unique_ptr<SelCollector> collector = absl::WrapUnique(new SelCollector( |
| params.config, std::move(thread_manager), std::move(subscriber))); |
| ECCLESIA_RETURN_IF_ERROR(collector->RecoverExistingSelFiles()); |
| return collector; |
| } |
| |
| SelCollector::SelCollector( |
| const SelConfig& config, std::unique_ptr<SelThreadManager> thread_manager, |
| std::unique_ptr<gbmc_sel_framework::GbmcSelSubscriber> subscriber) |
| : config_(config), |
| thread_manager_(std::move(thread_manager)), |
| default_subscriber_(std::move(subscriber)) {} |
| |
| SelCollector::~SelCollector() { Stop(); } |
| |
| void SelCollector::Stop() { |
| if (default_subscriber_) { |
| default_subscriber_->Stop(); |
| } |
| if (thread_manager_) { |
| thread_manager_->task_scheduler->Stop(); |
| } |
| } |
| |
| absl::Status SelCollector::StartDefaultSubscriber() { |
| if (default_subscriber_ == nullptr) { |
| return absl::InternalError("No default subscriber to start."); |
| } |
| thread_manager_->task_ids.push_back( |
| thread_manager_->task_scheduler->ScheduleAsync( |
| [this](absl::AnyInvocable<void()> on_done) { |
| DefaultSubscriberTask(); |
| on_done(); |
| }, |
| absl::Microseconds(100), absl::InfiniteDuration())); |
| return absl::OkStatus(); |
| } |
| |
| void SelCollector::DefaultSubscriberTask() { |
| if (std::optional<absl::flat_hash_map<std::string, std::string>> event = |
| default_subscriber_->Next(); |
| event != std::nullopt) { |
| WriteSelToFile(*event); |
| // Process all events in the journal, Next(0) will not block if there are |
| // no more events in the journal. |
| while (std::optional<absl::flat_hash_map<std::string, std::string>> event = |
| default_subscriber_->Next(0)) { |
| WriteSelToFile(*event); |
| } |
| |
| // Flush the file to ensure that the events are written to the file. |
| current_sel_file_.flush(); |
| } |
| } |
| |
| void SelCollector::WriteSelToFile( |
| const absl::flat_hash_map<std::string, std::string>& event) { |
| if (current_sel_file_count_ >= config_.chunk_size() || |
| !current_sel_file_.is_open()) { |
| RotateSelFile(); |
| } |
| // convert to json |
| nlohmann::json json_event = event; |
| |
| // Add the SEL ID to the json event. |
| json_event["Id"] = |
| std::to_string(current_sel_file_start_id_ + current_sel_file_count_); |
| |
| current_sel_file_count_++; |
| current_sel_file_ << json_event.dump() << "\n"; |
| } |
| |
| void SelCollector::RotateSelFile() { |
| if (current_sel_file_.is_open()) { |
| current_sel_file_.close(); |
| } |
| |
| if (current_sel_file_count_ > 0) { |
| current_sel_file_start_id_ += current_sel_file_count_; |
| } |
| |
| { |
| absl::MutexLock lock(mutex_); |
| while (existing_sel_files_.size() >= config_.max_sel_file()) { |
| // Remove the oldest SEL file. |
| absl::btree_map<uint64_t, std::string>::iterator it = |
| existing_sel_files_.begin(); |
| std::error_code ec; |
| // Remove the oldest SEL file from the filesystem. |
| std::filesystem::remove(it->second, ec); |
| if (ec) { |
| LOG(ERROR) << "Failed to remove SEL file: " << it->second; |
| } |
| existing_sel_files_.erase(it); |
| } |
| } |
| |
| std::string sel_file_path = absl::Substitute( |
| "$0/sel_$1.jsonl", config_.sel_save_path(), current_sel_file_start_id_); |
| current_sel_file_.open(sel_file_path, std::ios::out); |
| if (!current_sel_file_.is_open()) { |
| LOG(ERROR) << "Failed to open SEL file: " << sel_file_path; |
| return; |
| } |
| current_sel_file_count_ = 0; |
| { |
| absl::MutexLock lock(mutex_); |
| existing_sel_files_[current_sel_file_start_id_] = sel_file_path; |
| } |
| } |
| |
| absl::Status SelCollector::RecoverExistingSelFiles() { |
| uint64_t max_id = 0; |
| bool found_any_sel_file = false; |
| std::string last_file_path; |
| std::error_code ec; |
| auto it = std::filesystem::directory_iterator(config_.sel_save_path(), ec); |
| if (ec) { |
| return absl::InternalError(absl::StrCat( |
| "Failed to iterate over SEL save path: ", config_.sel_save_path())); |
| } |
| auto end = std::filesystem::directory_iterator(); |
| for (; it != end; it.increment(ec)) { |
| const std::filesystem::directory_entry& entry = *it; |
| |
| std::error_code reg_ec; |
| bool is_reg = entry.is_regular_file(reg_ec); |
| if (reg_ec) { |
| LOG(ERROR) << "Error checking is_regular_file for path '" |
| << entry.path().string() << "': " << reg_ec.message(); |
| continue; |
| } |
| if (!is_reg) { |
| continue; |
| } |
| std::string file_name = entry.path().filename().string(); |
| |
| if (file_name.size() <= kSelFilePrefix.size() + kSelFileSuffix.size() || |
| !absl::StartsWith(file_name, kSelFilePrefix) || |
| !absl::EndsWith(file_name, kSelFileSuffix)) { |
| continue; |
| } |
| |
| // Extract the ID from the file name. |
| absl::string_view id_str = file_name; |
| id_str.remove_prefix(kSelFilePrefix.size()); |
| id_str.remove_suffix(kSelFileSuffix.size()); |
| uint64_t id = 0; |
| if (!absl::SimpleAtoi(id_str, &id)) { |
| LOG(ERROR) << "Failed to parse SEL file ID: " << file_name; |
| continue; |
| } |
| |
| { |
| absl::MutexLock lock(mutex_); |
| existing_sel_files_[id] = entry.path().string(); |
| } |
| found_any_sel_file = true; |
| if (last_file_path.empty() || id > max_id) { |
| max_id = id; |
| last_file_path = entry.path().string(); |
| } |
| } |
| if (ec) { |
| return absl::InternalError(absl::StrCat( |
| "Failed to iterate over SEL save path: ", config_.sel_save_path())); |
| } |
| if (found_any_sel_file) { |
| uint64_t num_sel_events = 0; |
| std::ifstream file(last_file_path); |
| if (file.is_open()) { |
| std::string line; |
| while (std::getline(file, line)) { |
| num_sel_events++; |
| } |
| current_sel_file_start_id_ = max_id + num_sel_events; |
| } else { |
| return absl::InternalError( |
| absl::StrCat("Failed to open last SEL file: ", last_file_path)); |
| } |
| } else { |
| current_sel_file_start_id_ = 0; |
| } |
| return absl::OkStatus(); |
| } |
| |
| std::vector<nlohmann::json> SelCollector::GetSelEntriesForward( |
| uint64_t start_id, size_t max_count) const { |
| std::vector<nlohmann::json> results; |
| if (max_count == 0) { |
| return results; |
| } |
| |
| // Get the list of file paths to read from. |
| std::vector<std::string> file_paths; |
| { |
| absl::MutexLock lock(mutex_); |
| if (existing_sel_files_.empty()) { |
| return results; |
| } |
| |
| // Find the file that most likely contains the start_id (i.e., the file with |
| // the largest starting ID that is <= start_id). If start_id is older than |
| // all existing files, it defaults to the oldest available file. |
| auto it = existing_sel_files_.upper_bound(start_id); |
| if (it != existing_sel_files_.begin()) { |
| --it; |
| } |
| |
| // Add all files from the starting file to the newest file. |
| while (it != existing_sel_files_.end()) { |
| file_paths.push_back(it->second); |
| ++it; |
| } |
| } |
| |
| for (const std::string& file_path : file_paths) { |
| if (results.size() >= max_count) break; |
| |
| std::ifstream file(file_path); |
| if (!file.is_open()) { |
| LOG(WARNING) << "Failed to open SEL file: " << file_path; |
| continue; |
| } |
| |
| // Read the file line by line, and parse each line as a JSON event. |
| // Skip any invalid JSON events or events that do not contain an ID. |
| // Only add events that have an ID greater than or equal to start_id. |
| std::string line; |
| while (std::getline(file, line) && results.size() < max_count) { |
| absl::StatusOr<ParsedLogEntry> parsed_log_entry = ParseLogEntry(line); |
| if (parsed_log_entry.ok() && parsed_log_entry->id >= start_id) { |
| results.push_back(std::move(parsed_log_entry->json_event)); |
| } |
| } |
| } |
| |
| return results; |
| } |
| |
| std::vector<nlohmann::json> SelCollector::GetSelEntriesBackward( |
| uint64_t start_id, size_t max_count) const { |
| std::vector<nlohmann::json> results; |
| if (max_count == 0) { |
| return results; |
| } |
| |
| std::vector<std::string> file_paths; |
| { |
| absl::MutexLock lock(mutex_); |
| if (existing_sel_files_.empty()) { |
| return results; |
| } |
| |
| auto it = existing_sel_files_.upper_bound(start_id); |
| if (it != existing_sel_files_.begin()) { |
| --it; |
| } |
| |
| // Add all files from the starting file to the oldest file. |
| while (true) { |
| file_paths.push_back(it->second); |
| if (it == existing_sel_files_.begin()) { |
| break; |
| } |
| --it; |
| } |
| } |
| |
| for (const std::string& file_path : file_paths) { |
| if (results.size() >= max_count) break; |
| |
| std::ifstream file(file_path); |
| if (!file.is_open()) { |
| LOG(WARNING) << "Failed to open SEL file: " << file_path; |
| continue; |
| } |
| |
| // Read the whole file line by line, then iterate in reverse order. |
| // Only add events that have an ID less than or equal to start_id. |
| std::vector<std::string> file_lines; |
| std::string line; |
| while (std::getline(file, line)) { |
| file_lines.push_back(std::move(line)); |
| } |
| |
| for (auto file_it = file_lines.rbegin(); |
| file_it != file_lines.rend() && results.size() < max_count; |
| ++file_it) { |
| absl::StatusOr<ParsedLogEntry> parsed_log_entry = ParseLogEntry(*file_it); |
| if (parsed_log_entry.ok() && parsed_log_entry->id <= start_id) { |
| results.push_back(std::move(parsed_log_entry->json_event)); |
| } |
| } |
| } |
| |
| return results; |
| } |
| |
| std::vector<nlohmann::json> SelCollector::GetLatestSelEntries( |
| size_t max_count) const { |
| return GetSelEntriesBackward(std::numeric_limits<uint64_t>::max(), max_count); |
| } |
| |
| void SelCollector::DumpSelEntriesRaw(uint64_t start_id, |
| std::ostream& os) const { |
| std::vector<std::string> file_paths; |
| { |
| absl::MutexLock lock(mutex_); |
| if (existing_sel_files_.empty()) { |
| return; |
| } |
| |
| auto it = existing_sel_files_.upper_bound(start_id); |
| if (it != existing_sel_files_.begin()) { |
| --it; |
| } |
| |
| while (it != existing_sel_files_.end()) { |
| file_paths.push_back(it->second); |
| ++it; |
| } |
| } |
| |
| // Phase 1: "Catch-Up" on the very first file. |
| // We must read line-by-line because it might contain IDs < start_id. |
| std::ifstream first_file(file_paths[0]); |
| if (first_file.is_open()) { |
| std::string line; |
| while (std::getline(first_file, line)) { |
| nlohmann::json json_event = nlohmann::json::parse(line, nullptr, false); |
| if (json_event.is_discarded() || !json_event.contains("Id")) { |
| continue; |
| } |
| |
| uint64_t id = 0; |
| std::string id_str = json_event["Id"].get<std::string>(); |
| if (absl::SimpleAtoi(id_str, &id) && id >= start_id) { |
| os << line << "\n"; |
| |
| os << first_file.rdbuf(); |
| // We've dumped all relevant entries from this file, so we can exit |
| // early. |
| break; |
| } |
| } |
| } else { |
| LOG(WARNING) << "Failed to open SEL file: " << file_paths[0]; |
| } |
| |
| // Phase 2: "Bulk Dump" on all remaining files. |
| // Every log in these files is guaranteed to be > start_id, so we stream the |
| // raw buffer. |
| for (size_t i = 1; i < file_paths.size(); ++i) { |
| std::ifstream file(file_paths[i]); |
| if (!file.is_open()) { |
| LOG(WARNING) << "Failed to open SEL file: " << file_paths[i]; |
| continue; |
| } |
| os << file.rdbuf(); |
| } |
| } |
| |
| nlohmann::json SelCollector::ToJson() const { |
| return nlohmann::json::parse("{\"Warning\": \"EmptySelCollector used.\"}"); |
| } |
| |
| nlohmann::json SelCollector::GetSchedulerStats() const { |
| return thread_manager_->task_scheduler->ToJson(); |
| } |
| |
| std::unique_ptr<EmptySelCollector> EmptySelCollector::Create() { |
| return std::make_unique<EmptySelCollector>(); |
| } |
| |
| absl::Status EmptySelCollector::StartDefaultSubscriber() { |
| return absl::OkStatus(); |
| } |
| |
| std::vector<nlohmann::json> EmptySelCollector::GetSelEntriesForward( |
| uint64_t start_id, size_t max_count) const { |
| return std::vector<nlohmann::json>(); |
| } |
| |
| std::vector<nlohmann::json> EmptySelCollector::GetSelEntriesBackward( |
| uint64_t start_id, size_t max_count) const { |
| return std::vector<nlohmann::json>(); |
| } |
| |
| std::vector<nlohmann::json> EmptySelCollector::GetLatestSelEntries( |
| size_t max_count) const { |
| return std::vector<nlohmann::json>(); |
| } |
| |
| void EmptySelCollector::DumpSelEntriesRaw(uint64_t start_id, |
| std::ostream& os) const { |
| // Empty collector, do nothing |
| } |
| |
| nlohmann::json EmptySelCollector::ToJson() const { |
| return nlohmann::json::parse("{\"Warning\": \"EmptySelCollector used.\"}"); |
| } |
| |
| nlohmann::json EmptySelCollector::GetSchedulerStats() const { |
| return nlohmann::json::parse("{\"Warning\": \"EmptySelCollector used.\"}"); |
| } |
| |
| } // namespace milotic_tlbmc |