| #include "persistent_storage_impl.h" |
| |
| #include <algorithm> |
| #include <cerrno> |
| #include <cstddef> |
| #include <cstdint> |
| #include <cstring> |
| #include <filesystem> // NOLINT(build/c++17) |
| #include <fstream> |
| #include <ios> |
| #include <iterator> |
| #include <ostream> |
| #include <string> |
| #include <system_error> // NOLINT(build/c++11) |
| #include <vector> |
| |
| #include "google/protobuf/timestamp.pb.h" |
| #include "action_context.h" |
| #include "safepower_agent.pb.h" |
| #include "safepower_agent_config.pb.h" |
| #include "state_persistence.pb.h" |
| #include "proto_reader.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/numbers.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/str_split.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/time/clock.h" |
| #include "absl/time/time.h" |
| #include "absl/types/span.h" |
| #include "google/protobuf/io/zero_copy_stream_impl.h" |
| #include "bmc/status_macros.h" |
| |
| namespace persistent_storage { |
| |
| using safepower_agent::ActionContext; |
| using safepower_agent_persistence_proto::SavedAction; |
| using safepower_agent_persistence_proto::SavedActionRecord; |
| using safepower_agent_persistence_proto::SavedActions; |
| using borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest; |
| |
| inline absl::Status WriteSavedActions(const std::string& proto_path, |
| const SavedActions& in_proto) { |
| int fd = open(proto_path.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644); |
| if (fd < 0) { |
| int ec = errno; |
| LOG(ERROR) << "Failed to open file " << proto_path << " " << strerror(ec); |
| return absl::UnavailableError(absl::StrCat( |
| "Failed to open file: ", proto_path, ":", strerror(ec))); |
| } |
| |
| google::protobuf::io::FileOutputStream stream(fd); |
| stream.SetCloseOnDelete(true); |
| if (!in_proto.SerializeToZeroCopyStream(&stream)) { |
| int ec = stream.GetErrno(); |
| LOG(ERROR) << "Failed to serialize the proto file" << proto_path |
| << " error:" << strerror(ec); |
| return absl::InternalError( |
| absl::StrCat("Failed to serialize the proto file: ", proto_path, |
| " error:", strerror(ec))); |
| } |
| |
| int ret = fsync(fd); |
| if (ret < 0) { |
| int ec = errno; |
| LOG(ERROR) << "Failed to sync the file " << proto_path << " " |
| << strerror(ec); |
| return absl::DataLossError(absl::StrCat( |
| "Failed to sync the file: ", proto_path, ":", strerror(ec))); |
| } |
| return absl::OkStatus(); |
| } |
| |
| PersistentStorageManagerImpl::PersistentStorageManagerImpl( |
| const safepower_agent_config::PersistentStorageConfig& config) { |
| if (!config.dir_path().empty()) { |
| dir_path_ = config.dir_path(); |
| } |
| if (config.max_file_size() > 0) { |
| max_file_size_ = config.max_file_size(); |
| } |
| if (config.proto_keep_number() > 0) { |
| proto_keep_number_ = config.proto_keep_number(); |
| } |
| } |
| |
| absl::Status PersistentStorageManagerImpl::WriteSavedActionsChange( |
| const SavedActions& actions) { |
| return this->WriteSavedActionsChange(actions, dir_path_); |
| } |
| |
| // use the data in the input arg to make several records |
| // if this is the first time writing the file, write the record log |
| // append the records to the record log, by saving them in wire format |
| // check the file size, |
| // if it is too large, |
| // merge, and convert to map, and back to append log |
| // and rewrite the file |
| |
| absl::Status PersistentStorageManagerImpl::WriteSavedActionsChange( |
| const SavedActions& actions, absl::string_view dir_path) { |
| std::string write_string; |
| |
| auto files = listFiles(dir_path); |
| if (!files.ok()) { |
| return files.status(); |
| } |
| auto id = findLargestFileId(files.value()); |
| if (!id.ok()) { |
| return id.status(); |
| } |
| |
| std::string file_path = MakeFileName(dir_path, id.value()); |
| |
| // write the new record to the file, and close it |
| // this ensures that the data is always written before the file is collated |
| RETURN_IF_ERROR(WriteSavedActions(file_path, actions)); |
| |
| // if the file is too large, merge the files |
| std::error_code ec; |
| std::uintmax_t file_size = std::filesystem::file_size(file_path, ec); |
| if (ec) { |
| return absl::UnavailableError( |
| absl::StrCat("Failed to get file size", ec.message())); |
| } |
| if (file_size > max_file_size_) { |
| LOG(INFO) << "file size too large merging" << std::endl; |
| absl::StatusOr<SavedActions> saved_actions_write_ordered = |
| proto_reader::ReadProto<SavedActions>(file_path); |
| if (!saved_actions_write_ordered.ok()) { |
| return saved_actions_write_ordered.status(); |
| } |
| absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash, |
| FlightRecordEq> |
| new_saved_actions_map; |
| absl::Status status = |
| this->ConvertAppendToMap(new_saved_actions_map, |
| *saved_actions_write_ordered); |
| if (!status.ok()) { |
| return status; |
| } |
| |
| // only keep the most recent protos |
| |
| absl::Status keep_status = keepMostRecentProtos(new_saved_actions_map); |
| if (!keep_status.ok()) { |
| return keep_status; |
| } |
| |
| // convert map to append log to save it again |
| SavedActions new_log; |
| for (auto& i : new_saved_actions_map) { |
| SavedActionRecord *ptr_record = new_log.add_saved_action_records(); |
| *(ptr_record->mutable_actions()) = i.second; |
| } |
| |
| std::string next_file = MakeFileName(dir_path, id.value() + 1); |
| RETURN_IF_ERROR(WriteSavedActions(next_file, new_log)); |
| return deleteAllFilesExceptLargest(dir_path); |
| } |
| return absl::OkStatus(); |
| } |
| |
| absl::Status PersistentStorageManagerImpl::ConvertAppendToMap( |
| absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash, |
| FlightRecordEq >& map, |
| const SavedActions& saved_actions) { |
| for (const auto& i : saved_actions.saved_action_records()) { |
| auto flight_record = i.actions().original_request().flight_record(); |
| auto [it, inserted] = |
| map.try_emplace(flight_record, i.actions()); |
| if (!inserted) { |
| it->second.MergeFrom(i.actions()); |
| } |
| } |
| return absl::OkStatus(); |
| } |
| |
| absl::StatusOr<SavedActions> PersistentStorageManagerImpl::ReadSavedActions() { |
| return this->ReadSavedActions(kDefaultPath); |
| } |
| |
| absl::StatusOr<SavedActions> PersistentStorageManagerImpl::CompressLogs |
| (const SavedActions& saved_actions) { |
| // convert to map, and back to append log |
| absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash, |
| FlightRecordEq> |
| new_saved_actions_map; |
| absl::Status status = |
| this->ConvertAppendToMap(new_saved_actions_map, |
| saved_actions); |
| if (!status.ok()) { |
| return status; |
| } |
| SavedActions new_log; |
| for (auto& i : new_saved_actions_map) { |
| SavedActionRecord *ptr_record = new_log.add_saved_action_records(); |
| *(ptr_record->mutable_actions()) = i.second; |
| } |
| return new_log; |
| } |
| |
| absl::StatusOr<SavedActions> PersistentStorageManagerImpl::ReadSavedActions( |
| absl::string_view dir_path) { |
| auto files = listFiles(dir_path); |
| if (!files.ok()) { |
| return files.status(); |
| } |
| auto id = findLargestFileId(files.value()); |
| if (!id.ok()) { |
| return id.status(); |
| } |
| std::string file_path = MakeFileName(dir_path, id.value()); |
| auto saved_actions = proto_reader::ReadProto<SavedActions>(file_path); |
| absl::StatusOr<SavedActions> compressed_saved_actions; |
| if (saved_actions.ok()) { |
| compressed_saved_actions = CompressLogs(saved_actions.value()); |
| if (compressed_saved_actions.ok()) { |
| return compressed_saved_actions.value(); |
| } |
| } |
| absl::Status original_failing_status = saved_actions.status().ok() ? |
| compressed_saved_actions.status() : |
| saved_actions.status(); |
| |
| LOG(ERROR) << "Unable to read SavedActions proto file file_path:" |
| << file_path << saved_actions.status(); |
| if (id.value() == 0) { |
| // if this is the first file, and we still failed, return the original |
| // failing status |
| return saved_actions.status(); |
| } |
| file_path = MakeFileName(dir_path, id.value() - 1); |
| LOG(ERROR) << "Trying to read older state:" << file_path; |
| auto saved_actions_older = proto_reader::ReadProto<SavedActions>(file_path); |
| if (!saved_actions_older.ok()) { |
| // return the value of the original message, print the status of the |
| // second read, and return the original status |
| LOG(ERROR) << "reading older status failed, (probably file not present)" |
| << saved_actions_older.status(); |
| return original_failing_status; |
| } |
| auto compressed_logs_older = CompressLogs(saved_actions_older.value()); |
| if (!compressed_logs_older.ok()) { |
| LOG(ERROR) << "compressing older logs failed" |
| << compressed_logs_older.status(); |
| return original_failing_status; |
| } |
| return compressed_logs_older.value(); |
| } |
| |
| google::protobuf::Timestamp getChangeTime(const SavedAction& action) { |
| if (!action.has_action_state_log() || |
| (action.action_state_log().history().empty())) { |
| // if there is no history, return the time of the last 24 hours |
| absl::Time now = absl::Now(); |
| absl::Time one_day_ago = now - absl::Hours(24); |
| google::protobuf::Timestamp timestamp; |
| timestamp.set_seconds(absl::ToUnixSeconds(one_day_ago)); |
| return timestamp; |
| } |
| return action.action_state_log().history().rbegin()->changed_at(); |
| } |
| |
| bool AtEndState(const SavedAction& action) { |
| if ((!action.has_action_state_log()) || |
| !action.action_state_log().has_current_state()) { |
| return false; // if there is no state |
| // we should depend on the timing logic to clear it |
| } |
| const safepower_agent_proto::ActionState& state = |
| action.action_state_log().current_state(); |
| return ActionContext::IsFinalState(state); |
| } |
| |
| // when the persistence proto is cleaned up, we need to remove the protos that |
| // we need to keep the most recent N protos |
| absl::Status PersistentStorageManagerImpl::keepMostRecentProtos( |
| absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash, |
| FlightRecordEq>& map) { |
| if (map.size() < proto_keep_number_) { |
| LOG(WARNING) << " Assumptions about the size of the" |
| << " saved protos are wrong" << std::endl |
| << "The number of saved protos is " << map.size() |
| << "less than the number we want to keep " |
| << proto_keep_number_; |
| return absl::OkStatus(); |
| } |
| |
| // sort the protos by time |
| std::vector<SavedActionRecord> records; |
| for (const auto& i : map) { |
| SavedActionRecord record; |
| *(record.mutable_actions()) = i.second; |
| records.push_back(record); |
| } |
| std::stable_sort(records.begin(), records.end(), |
| [](const SavedActionRecord& a, const SavedActionRecord& b) { |
| // if state a is not at end state |
| // and state b is at end state |
| // delete b before a, by claiming the order is correct, and |
| // returning true |
| if (!AtEndState(a.actions()) && AtEndState(b.actions())) { |
| return true; |
| } |
| auto time_a = getChangeTime(a.actions()); |
| auto time_b = getChangeTime(b.actions()); |
| // sort in ascending order |
| // keep tail, and delete head |
| return (time_a.seconds() < time_b.seconds()) || |
| (time_a.seconds() == time_b.seconds() && |
| time_a.nanos() < time_b.nanos()); |
| }); |
| |
| // erase the oldest protos, keeping proto_keep_number_ protos |
| LOG(INFO) << "cleaning up the saved protos"; |
| LOG(INFO) << "current:" << records.size(); |
| LOG(INFO) << "keeping:" << proto_keep_number_; |
| const size_t num_to_delete = records.size() - proto_keep_number_; |
| LOG(INFO) << "deleting:" << num_to_delete; |
| if (proto_keep_number_ > records.size()) { |
| LOG(INFO) << "keeping all protos"; |
| return absl::OkStatus(); |
| } |
| auto start = std::next(records.begin(), proto_keep_number_ + 1); |
| // Log the protos that are being deleted for increased retention. |
| for (auto i = start; i != records.end(); i++) { |
| auto action_to_delete = i->actions().original_request().flight_record(); |
| LOG(INFO) << "deleting proto:" << action_to_delete << std::endl; |
| map.erase(action_to_delete); |
| } |
| return absl::OkStatus(); |
| } |
| |
| // find the largest file id, and return it |
| |
| // the file id is a number, and the file name is in the format of |
| // "savedactions_1" |
| // "savedactions_2" |
| // where the largest id is the most recent file |
| absl::StatusOr<uint64_t> PersistentStorageManagerImpl::findLargestFileId( |
| absl::Span<const std::string> files) { |
| std::error_code ec; |
| uint64_t largest_id = 0; |
| for (const auto& entry : files) { |
| std::vector<absl::string_view> file_name_parts = absl::StrSplit(entry, '_'); |
| if (file_name_parts.size() != 2) { |
| LOG(WARNING) << "unable to parse the file name in the directory " << entry |
| << " size:" << file_name_parts.size(); |
| continue; |
| } |
| uint64_t file_id; |
| if (!absl::SimpleAtoi(file_name_parts[1], &file_id)) { |
| LOG(WARNING) << "unable to parse the file ID as unint64_t" << entry; |
| continue; |
| } |
| if (file_name_parts[0] != kfileNamePrefix) { |
| LOG(WARNING) << "unknown file name prefix" << entry; |
| continue; |
| } |
| if (file_id > largest_id) { |
| largest_id = file_id; |
| } |
| } |
| return largest_id; |
| } |
| |
| absl::StatusOr<std::vector<std::string> > |
| PersistentStorageManagerImpl::listFiles(absl::string_view file_path) { |
| if (!std::filesystem::exists(file_path) || |
| !std::filesystem::is_directory(file_path)) { |
| LOG(ERROR) << "Failed to read the directory " << file_path |
| << std::filesystem::exists(file_path) << ":" |
| << std::filesystem::is_directory(file_path); |
| return absl::FailedPreconditionError("Failed to find the directory"); |
| } |
| std::vector<std::string> files; |
| for (const auto& entry : std::filesystem::directory_iterator(file_path)) { |
| files.push_back(entry.path().filename()); |
| } |
| return files; |
| } |
| |
| absl::Status PersistentStorageManagerImpl::deleteAllFilesExceptLargest( |
| absl::string_view file_path) { |
| auto files = listFiles(file_path); |
| if (!files.ok()) { |
| return files.status(); |
| } |
| auto largest = findLargestFileId(files.value()); |
| if (!largest.ok()) { |
| return largest.status(); |
| } |
| |
| std::string largest_file_name = |
| MakeFileName(kfileNamePrefix, largest.value()); |
| for (const auto& entry : files.value()) { |
| if (entry == largest_file_name) { |
| continue; |
| } |
| std::string del_path = absl::StrCat(file_path, entry); |
| if (!std::filesystem::remove(del_path)) { |
| LOG(ERROR) << "Failed to delete the file " << del_path; |
| } |
| } |
| return absl::OkStatus(); |
| } |
| |
| absl::Status PersistentStorageManagerImpl::InitializeSavedActions() { |
| if (!std::filesystem::exists(dir_path_)) { |
| return absl::FailedPreconditionError( |
| absl::StrCat("Directory does not exist: ", dir_path_)); |
| } |
| for (std::filesystem::path file : |
| std::filesystem::directory_iterator(dir_path_)) { |
| if (std::error_code ec; std::filesystem::remove_all(file, ec) == |
| static_cast<std::uintmax_t>(-1)) { |
| return absl::UnavailableError(absl::StrCat( |
| "Failed to delete the file ", file.string(), ":", ec.message())); |
| } |
| LOG(WARNING) << "Deleted " << file.string(); |
| } |
| return absl::OkStatus(); |
| } |
| |
| std::string PersistentStorageManagerImpl::MakeFileName( |
| absl::string_view file_path, uint64_t file_id) { |
| return absl::StrCat(file_path, "/", kfileNamePrefix, "_", |
| std::to_string(file_id)); |
| } |
| } // namespace persistent_storage |