blob: cae2042e7b7741e7bc5a5da9fd5d8065f58806db [file] [log] [blame]
#include "persistent_storage_impl.h"
#include <algorithm>
#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <filesystem> // NOLINT(build/c++17)
#include <fstream>
#include <ios>
#include <iterator>
#include <ostream>
#include <string>
#include <system_error> // NOLINT(build/c++11)
#include <vector>
#include "google/protobuf/timestamp.pb.h"
#include "action_context.h"
#include "safepower_agent.pb.h"
#include "safepower_agent_config.pb.h"
#include "state_persistence.pb.h"
#include "proto_reader.h"
#include "absl/container/flat_hash_map.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "bmc/status_macros.h"
namespace persistent_storage {
using safepower_agent::ActionContext;
using safepower_agent_persistence_proto::SavedAction;
using safepower_agent_persistence_proto::SavedActionRecord;
using safepower_agent_persistence_proto::SavedActions;
using borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest;
inline absl::Status WriteSavedActions(const std::string& proto_path,
const SavedActions& in_proto) {
int fd = open(proto_path.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
if (fd < 0) {
int ec = errno;
LOG(ERROR) << "Failed to open file " << proto_path << " " << strerror(ec);
return absl::UnavailableError(absl::StrCat(
"Failed to open file: ", proto_path, ":", strerror(ec)));
}
google::protobuf::io::FileOutputStream stream(fd);
stream.SetCloseOnDelete(true);
if (!in_proto.SerializeToZeroCopyStream(&stream)) {
int ec = stream.GetErrno();
LOG(ERROR) << "Failed to serialize the proto file" << proto_path
<< " error:" << strerror(ec);
return absl::InternalError(
absl::StrCat("Failed to serialize the proto file: ", proto_path,
" error:", strerror(ec)));
}
int ret = fsync(fd);
if (ret < 0) {
int ec = errno;
LOG(ERROR) << "Failed to sync the file " << proto_path << " "
<< strerror(ec);
return absl::DataLossError(absl::StrCat(
"Failed to sync the file: ", proto_path, ":", strerror(ec)));
}
return absl::OkStatus();
}
PersistentStorageManagerImpl::PersistentStorageManagerImpl(
const safepower_agent_config::PersistentStorageConfig& config) {
if (!config.dir_path().empty()) {
dir_path_ = config.dir_path();
}
if (config.max_file_size() > 0) {
max_file_size_ = config.max_file_size();
}
if (config.proto_keep_number() > 0) {
proto_keep_number_ = config.proto_keep_number();
}
}
absl::Status PersistentStorageManagerImpl::WriteSavedActionsChange(
const SavedActions& actions) {
return this->WriteSavedActionsChange(actions, dir_path_);
}
// use the data in the input arg to make several records
// if this is the first time writing the file, write the record log
// append the records to the record log, by saving them in wire format
// check the file size,
// if it is too large,
// merge, and convert to map, and back to append log
// and rewrite the file
absl::Status PersistentStorageManagerImpl::WriteSavedActionsChange(
const SavedActions& actions, absl::string_view dir_path) {
std::string write_string;
auto files = listFiles(dir_path);
if (!files.ok()) {
return files.status();
}
auto id = findLargestFileId(files.value());
if (!id.ok()) {
return id.status();
}
std::string file_path = MakeFileName(dir_path, id.value());
// write the new record to the file, and close it
// this ensures that the data is always written before the file is collated
RETURN_IF_ERROR(WriteSavedActions(file_path, actions));
// if the file is too large, merge the files
std::error_code ec;
std::uintmax_t file_size = std::filesystem::file_size(file_path, ec);
if (ec) {
return absl::UnavailableError(
absl::StrCat("Failed to get file size", ec.message()));
}
if (file_size > max_file_size_) {
LOG(INFO) << "file size too large merging" << std::endl;
absl::StatusOr<SavedActions> saved_actions_write_ordered =
proto_reader::ReadProto<SavedActions>(file_path);
if (!saved_actions_write_ordered.ok()) {
return saved_actions_write_ordered.status();
}
absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash,
FlightRecordEq>
new_saved_actions_map;
absl::Status status =
this->ConvertAppendToMap(new_saved_actions_map,
*saved_actions_write_ordered);
if (!status.ok()) {
return status;
}
// only keep the most recent protos
absl::Status keep_status = keepMostRecentProtos(new_saved_actions_map);
if (!keep_status.ok()) {
return keep_status;
}
// convert map to append log to save it again
SavedActions new_log;
for (auto& i : new_saved_actions_map) {
SavedActionRecord *ptr_record = new_log.add_saved_action_records();
*(ptr_record->mutable_actions()) = i.second;
}
std::string next_file = MakeFileName(dir_path, id.value() + 1);
RETURN_IF_ERROR(WriteSavedActions(next_file, new_log));
return deleteAllFilesExceptLargest(dir_path);
}
return absl::OkStatus();
}
absl::Status PersistentStorageManagerImpl::ConvertAppendToMap(
absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash,
FlightRecordEq >& map,
const SavedActions& saved_actions) {
for (const auto& i : saved_actions.saved_action_records()) {
auto flight_record = i.actions().original_request().flight_record();
auto [it, inserted] =
map.try_emplace(flight_record, i.actions());
if (!inserted) {
it->second.MergeFrom(i.actions());
}
}
return absl::OkStatus();
}
absl::StatusOr<SavedActions> PersistentStorageManagerImpl::ReadSavedActions() {
return this->ReadSavedActions(kDefaultPath);
}
absl::StatusOr<SavedActions> PersistentStorageManagerImpl::CompressLogs
(const SavedActions& saved_actions) {
// convert to map, and back to append log
absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash,
FlightRecordEq>
new_saved_actions_map;
absl::Status status =
this->ConvertAppendToMap(new_saved_actions_map,
saved_actions);
if (!status.ok()) {
return status;
}
SavedActions new_log;
for (auto& i : new_saved_actions_map) {
SavedActionRecord *ptr_record = new_log.add_saved_action_records();
*(ptr_record->mutable_actions()) = i.second;
}
return new_log;
}
absl::StatusOr<SavedActions> PersistentStorageManagerImpl::ReadSavedActions(
absl::string_view dir_path) {
auto files = listFiles(dir_path);
if (!files.ok()) {
return files.status();
}
auto id = findLargestFileId(files.value());
if (!id.ok()) {
return id.status();
}
std::string file_path = MakeFileName(dir_path, id.value());
auto saved_actions = proto_reader::ReadProto<SavedActions>(file_path);
absl::StatusOr<SavedActions> compressed_saved_actions;
if (saved_actions.ok()) {
compressed_saved_actions = CompressLogs(saved_actions.value());
if (compressed_saved_actions.ok()) {
return compressed_saved_actions.value();
}
}
absl::Status original_failing_status = saved_actions.status().ok() ?
compressed_saved_actions.status() :
saved_actions.status();
LOG(ERROR) << "Unable to read SavedActions proto file file_path:"
<< file_path << saved_actions.status();
if (id.value() == 0) {
// if this is the first file, and we still failed, return the original
// failing status
return saved_actions.status();
}
file_path = MakeFileName(dir_path, id.value() - 1);
LOG(ERROR) << "Trying to read older state:" << file_path;
auto saved_actions_older = proto_reader::ReadProto<SavedActions>(file_path);
if (!saved_actions_older.ok()) {
// return the value of the original message, print the status of the
// second read, and return the original status
LOG(ERROR) << "reading older status failed, (probably file not present)"
<< saved_actions_older.status();
return original_failing_status;
}
auto compressed_logs_older = CompressLogs(saved_actions_older.value());
if (!compressed_logs_older.ok()) {
LOG(ERROR) << "compressing older logs failed"
<< compressed_logs_older.status();
return original_failing_status;
}
return compressed_logs_older.value();
}
google::protobuf::Timestamp getChangeTime(const SavedAction& action) {
if (!action.has_action_state_log() ||
(action.action_state_log().history().empty())) {
// if there is no history, return the time of the last 24 hours
absl::Time now = absl::Now();
absl::Time one_day_ago = now - absl::Hours(24);
google::protobuf::Timestamp timestamp;
timestamp.set_seconds(absl::ToUnixSeconds(one_day_ago));
return timestamp;
}
return action.action_state_log().history().rbegin()->changed_at();
}
bool AtEndState(const SavedAction& action) {
if ((!action.has_action_state_log()) ||
!action.action_state_log().has_current_state()) {
return false; // if there is no state
// we should depend on the timing logic to clear it
}
const safepower_agent_proto::ActionState& state =
action.action_state_log().current_state();
return ActionContext::IsFinalState(state);
}
// when the persistence proto is cleaned up, we need to remove the protos that
// we need to keep the most recent N protos
absl::Status PersistentStorageManagerImpl::keepMostRecentProtos(
absl::flat_hash_map<FlightRecordRequest, SavedAction, FlightRecordHash,
FlightRecordEq>& map) {
if (map.size() < proto_keep_number_) {
LOG(WARNING) << " Assumptions about the size of the"
<< " saved protos are wrong" << std::endl
<< "The number of saved protos is " << map.size()
<< "less than the number we want to keep "
<< proto_keep_number_;
return absl::OkStatus();
}
// sort the protos by time
std::vector<SavedActionRecord> records;
for (const auto& i : map) {
SavedActionRecord record;
*(record.mutable_actions()) = i.second;
records.push_back(record);
}
std::stable_sort(records.begin(), records.end(),
[](const SavedActionRecord& a, const SavedActionRecord& b) {
// if state a is not at end state
// and state b is at end state
// delete b before a, by claiming the order is correct, and
// returning true
if (!AtEndState(a.actions()) && AtEndState(b.actions())) {
return true;
}
auto time_a = getChangeTime(a.actions());
auto time_b = getChangeTime(b.actions());
// sort in ascending order
// keep tail, and delete head
return (time_a.seconds() < time_b.seconds()) ||
(time_a.seconds() == time_b.seconds() &&
time_a.nanos() < time_b.nanos());
});
// erase the oldest protos, keeping proto_keep_number_ protos
LOG(INFO) << "cleaning up the saved protos";
LOG(INFO) << "current:" << records.size();
LOG(INFO) << "keeping:" << proto_keep_number_;
const size_t num_to_delete = records.size() - proto_keep_number_;
LOG(INFO) << "deleting:" << num_to_delete;
if (proto_keep_number_ > records.size()) {
LOG(INFO) << "keeping all protos";
return absl::OkStatus();
}
auto start = std::next(records.begin(), proto_keep_number_ + 1);
// Log the protos that are being deleted for increased retention.
for (auto i = start; i != records.end(); i++) {
auto action_to_delete = i->actions().original_request().flight_record();
LOG(INFO) << "deleting proto:" << action_to_delete << std::endl;
map.erase(action_to_delete);
}
return absl::OkStatus();
}
// find the largest file id, and return it
// the file id is a number, and the file name is in the format of
// "savedactions_1"
// "savedactions_2"
// where the largest id is the most recent file
absl::StatusOr<uint64_t> PersistentStorageManagerImpl::findLargestFileId(
absl::Span<const std::string> files) {
std::error_code ec;
uint64_t largest_id = 0;
for (const auto& entry : files) {
std::vector<absl::string_view> file_name_parts = absl::StrSplit(entry, '_');
if (file_name_parts.size() != 2) {
LOG(WARNING) << "unable to parse the file name in the directory " << entry
<< " size:" << file_name_parts.size();
continue;
}
uint64_t file_id;
if (!absl::SimpleAtoi(file_name_parts[1], &file_id)) {
LOG(WARNING) << "unable to parse the file ID as unint64_t" << entry;
continue;
}
if (file_name_parts[0] != kfileNamePrefix) {
LOG(WARNING) << "unknown file name prefix" << entry;
continue;
}
if (file_id > largest_id) {
largest_id = file_id;
}
}
return largest_id;
}
absl::StatusOr<std::vector<std::string> >
PersistentStorageManagerImpl::listFiles(absl::string_view file_path) {
if (!std::filesystem::exists(file_path) ||
!std::filesystem::is_directory(file_path)) {
LOG(ERROR) << "Failed to read the directory " << file_path
<< std::filesystem::exists(file_path) << ":"
<< std::filesystem::is_directory(file_path);
return absl::FailedPreconditionError("Failed to find the directory");
}
std::vector<std::string> files;
for (const auto& entry : std::filesystem::directory_iterator(file_path)) {
files.push_back(entry.path().filename());
}
return files;
}
absl::Status PersistentStorageManagerImpl::deleteAllFilesExceptLargest(
absl::string_view file_path) {
auto files = listFiles(file_path);
if (!files.ok()) {
return files.status();
}
auto largest = findLargestFileId(files.value());
if (!largest.ok()) {
return largest.status();
}
std::string largest_file_name =
MakeFileName(kfileNamePrefix, largest.value());
for (const auto& entry : files.value()) {
if (entry == largest_file_name) {
continue;
}
std::string del_path = absl::StrCat(file_path, entry);
if (!std::filesystem::remove(del_path)) {
LOG(ERROR) << "Failed to delete the file " << del_path;
}
}
return absl::OkStatus();
}
absl::Status PersistentStorageManagerImpl::InitializeSavedActions() {
if (!std::filesystem::exists(dir_path_)) {
return absl::FailedPreconditionError(
absl::StrCat("Directory does not exist: ", dir_path_));
}
for (std::filesystem::path file :
std::filesystem::directory_iterator(dir_path_)) {
if (std::error_code ec; std::filesystem::remove_all(file, ec) ==
static_cast<std::uintmax_t>(-1)) {
return absl::UnavailableError(absl::StrCat(
"Failed to delete the file ", file.string(), ":", ec.message()));
}
LOG(WARNING) << "Deleted " << file.string();
}
return absl::OkStatus();
}
std::string PersistentStorageManagerImpl::MakeFileName(
absl::string_view file_path, uint64_t file_id) {
return absl::StrCat(file_path, "/", kfileNamePrefix, "_",
std::to_string(file_id));
}
} // namespace persistent_storage