#include "persistent_storage_impl.h"

#include <algorithm>
#include <cstdint>
#include <filesystem>  // NOLINT(build/c++17)
#include <fstream>
#include <ios>
#include <iterator>
#include <ostream>
#include <string>
#include <system_error>  // NOLINT(build/c++11)
#include <vector>

#include "google/protobuf/timestamp.pb.h"
#include "action_context.h"
#include "safepower_agent.pb.h"
#include "safepower_agent_config.pb.h"
#include "state_persistence.pb.h"
#include "proto_reader.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/span.h"

namespace persistent_storage {

using safepower_agent::ActionContext;
using safepower_agent_persistence_proto::SavedAction;
using safepower_agent_persistence_proto::SavedActionAppendLog;
using safepower_agent_persistence_proto::SavedActionRecord;
using safepower_agent_persistence_proto::SavedActions;

PersistentStorageManagerImpl::PersistentStorageManagerImpl(
    const safepower_agent_config::PersistentStorageConfig& config) {
  if (!config.dir_path().empty()) {
    dir_path_ = config.dir_path();
  }
  if (config.max_file_size() > 0) {
    max_file_size_ = config.max_file_size();
  }
  if (config.proto_keep_number() > 0) {
    proto_keep_number_ = config.proto_keep_number();
  }
}

absl::Status PersistentStorageManagerImpl::WriteSavedActionsChange(
    const SavedActions& actions) {
  return this->WriteSavedActionsChange(actions, dir_path_);
}

// use the data in the input arg to make several records
// if this is the first time writing the file, write the record log
// append the records to the record log, by saving them in wire format
// check the file size,
// if it is too large,
// merge, and convert to map, and back to append log
// and rewrite the file

absl::Status PersistentStorageManagerImpl::WriteSavedActionsChange(
    const SavedActions& actions, absl::string_view dir_path) {
  SavedActionAppendLog emptyAppend;
  for (const auto& i : actions.actions()) {
    SavedActionRecord* record = emptyAppend.add_records();
    record->set_action_id(i.first);
    *(record->mutable_actions()) = i.second;
  }
  std::string write_string;
  if (!emptyAppend.SerializeToString(&write_string)) {
    return absl::UnavailableError("Failed to Serialize the change record");
  }
  auto files = listFiles(dir_path);
  if (!files.ok()) {
    return files.status();
  }
  auto id = findLargestFileId(files.value());
  if (!id.ok()) {
    return id.status();
  }

  std::string file_path = MakeFileName(dir_path, id.value());

  // write the new record to the file, and close it
  // this ensures that the data is always written before the file is collated
  std::ofstream out_file;
  out_file.open(std::string(file_path), std::ios_base::app | std::ios::binary);
  out_file.write(write_string.c_str(), write_string.size());
  out_file.close();

  // if the file is too large, merge the files
  std::error_code ec;
  std::uintmax_t file_size = std::filesystem::file_size(file_path, ec);
  if (ec) {
    return absl::UnavailableError(
        absl::StrCat("Failed to get file size", ec.message()));
  }
  if (file_size > max_file_size_) {
    LOG(INFO) << "file size too large merging" << std::endl;
    absl::StatusOr<SavedActionAppendLog> append =
        proto_reader::ReadProto<SavedActionAppendLog>(file_path);
    if (!append.ok()) {
      return append.status();
    }

    SavedActions new_saved_actions_map;
    absl::Status status =
        this->ConvertAppendToMap(new_saved_actions_map, *append);
    if (!status.ok()) {
      return status;
    }
    // only keep the most recent protos
    auto keep_status = keepMostRecentProtos(new_saved_actions_map);
    if (!keep_status.ok()) {
      return keep_status;
    }
    // convert map to append log to save it again
    SavedActionAppendLog new_log;
    for (const auto& i : new_saved_actions_map.actions()) {
      SavedActionRecord record;
      record.set_action_id(i.first);
      *record.mutable_actions() = i.second;
      *new_log.add_records() = record;
    }

    std::ofstream out_file;

    std::string next_file = MakeFileName(dir_path, id.value() + 1);
    out_file.open(std::string(next_file), std::ios::binary);
    if (!new_log.SerializeToOstream(&out_file)) {
      return absl::InternalError("Failed to Serialize the change");
    }
    if (out_file.fail()) {
      return absl::UnavailableError("Failed to write to file");
    }
    out_file.close();
    return deleteAllFilesExceptLargest(dir_path);
  }
  return absl::OkStatus();
}

absl::Status PersistentStorageManagerImpl::ConvertAppendToMap(
    SavedActions& map, SavedActionAppendLog& log) {
  for (const auto& i : log.records()) {
    auto [it, inserted] =
        map.mutable_actions()->try_emplace(i.action_id(), i.actions());
    if (!inserted) {
      it->second.MergeFrom(i.actions());
    }
  }
  return absl::OkStatus();
}

absl::StatusOr<SavedActions> PersistentStorageManagerImpl::ReadSavedActions() {
  return this->ReadSavedActions(kDefaultPath);
}

absl::StatusOr<SavedActions> PersistentStorageManagerImpl::ReadSavedActions(
    absl::string_view dir_path) {
  auto files = listFiles(dir_path);
  if (!files.ok()) {
    return files.status();
  }
  auto id = findLargestFileId(files.value());
  if (!id.ok()) {
    return id.status();
  }
  std::string file_path = MakeFileName(dir_path, id.value());

  auto saved_actions = proto_reader::ReadProto<SavedActionAppendLog>(file_path);
  if (!saved_actions.ok()) {
    LOG(ERROR) << "Unable to read SavedActions proto file file_path:"
               << file_path << saved_actions.status();
    if (id.value() == 0) {
      return saved_actions.status();
    }
    file_path = MakeFileName(dir_path, id.value() - 1);
    LOG(ERROR) << "Trying to read older state:" << file_path;
    absl::Status original_status = saved_actions.status();
    saved_actions = proto_reader::ReadProto<SavedActionAppendLog>(file_path);
    if (!saved_actions.ok()) {
      // return the value of the original message, print the status of the
      // second read
      LOG(ERROR) << "reading older status failed, (probably file not present)"
                 << saved_actions.status();
      return original_status;
    }
  }
  SavedActions map;
  absl::Status status = (this->ConvertAppendToMap(map, *saved_actions));
  if (!status.ok()) {
    return status;
  }
  return map;
}

google::protobuf::Timestamp getChangeTime(const SavedAction& action) {
  if (!action.has_action_state_log() ||
      (action.action_state_log().history().empty())) {
    // if there is no history, return the time of the last 24 hours
    absl::Time now = absl::Now();
    absl::Time one_day_ago = now - absl::Hours(24);
    google::protobuf::Timestamp timestamp;
    timestamp.set_seconds(absl::ToUnixSeconds(one_day_ago));
    return timestamp;
  }
  return action.action_state_log().history().rbegin()->changed_at();
}

bool AtEndState(const SavedAction& action) {
  if ((!action.has_action_state_log()) ||
      !action.action_state_log().has_current_state()) {
    return false;  // if there is no state
    // we should depend on the timing logic to clear it
  }
  const safepower_agent_proto::ActionState& state =
      action.action_state_log().current_state();
  return ActionContext::IsFinalState(state);
}

// when the persistence proto is cleaned up, we need to remove the protos that
// we need to keep the most recent N protos
absl::Status PersistentStorageManagerImpl::keepMostRecentProtos(
    SavedActions& map) {
  if (map.actions().size() < proto_keep_number_) {
    LOG(WARNING) << " Assumptions about the size of the"
                 << " saved protos are wrong" << std::endl
                 << "The number of saved protos is " << map.actions().size()
                 << "less than the number we want to keep "
                 << proto_keep_number_;
    return absl::OkStatus();
  }

  // sort the protos by time
  std::vector<SavedActionRecord> records;
  for (const auto& i : map.actions()) {
    SavedActionRecord record;
    record.set_action_id(i.first);
    *record.mutable_actions() = i.second;
    records.push_back(record);
  }
  std::stable_sort(records.begin(), records.end(),
                   [](const SavedActionRecord& a, const SavedActionRecord& b) {
                     // if state a is not at end state
                     // and state b is at end state
                     // delete b before a, by claiming the order is correct, and
                     // returning true
                     if (!AtEndState(a.actions()) && AtEndState(b.actions())) {
                       return true;
                     }
                     auto time_a = getChangeTime(a.actions());
                     auto time_b = getChangeTime(b.actions());
                     // sort in ascending order
                     // keep tail, and delete head
                     return (time_a.seconds() < time_b.seconds()) ||
                            (time_a.seconds() == time_b.seconds() &&
                             time_a.nanos() < time_b.nanos());
                   });

  // erase the oldest protos, keeping proto_keep_number_ protos
  LOG(INFO) << "cleaning up the saved protos";
  LOG(INFO) << "current:" << records.size();
  LOG(INFO) << "keeping:" << proto_keep_number_;
  LOG(INFO) << "deleting:" << records.size() - proto_keep_number_;
  if (proto_keep_number_ > records.size()) {
    LOG(INFO) << "keeping all protos";
    return absl::OkStatus();
  }
  auto start = std::next(records.begin(), proto_keep_number_ + 1);
  for (auto i = start; i != records.end(); i++) {
    LOG(INFO) << " proto:" << absl::StrCat(map.actions().at(i->action_id()));
    map.mutable_actions()->erase(i->action_id());
  }
  return absl::OkStatus();
}

// find the largest file id, and return it

// the file id is a number, and the file name is in the format of
// "savedactions_1"
// "savedactions_2"
// where the largest id is the most recent file
absl::StatusOr<uint64_t> PersistentStorageManagerImpl::findLargestFileId(
    absl::Span<const std::string> files) {
  std::error_code ec;
  uint64_t largest_id = 0;
  for (const auto& entry : files) {
    std::vector<absl::string_view> file_name_parts = absl::StrSplit(entry, '_');
    if (file_name_parts.size() != 2) {
      LOG(WARNING) << "unable to parse the file name in the directory " << entry
                   << " size:" << file_name_parts.size();
      continue;
    }
    uint64_t file_id;
    if (!absl::SimpleAtoi(file_name_parts[1], &file_id)) {
      LOG(WARNING) << "unable to parse the file ID as unint64_t" << entry;
      continue;
    }
    if (file_name_parts[0] != kfileNamePrefix) {
      LOG(WARNING) << "unknown file name prefix" << entry;
      continue;
    }
    if (file_id > largest_id) {
      largest_id = file_id;
    }
  }
  return largest_id;
}

absl::StatusOr<std::vector<std::string> >
PersistentStorageManagerImpl::listFiles(absl::string_view file_path) {
  if (!std::filesystem::exists(file_path) ||
      !std::filesystem::is_directory(file_path)) {
    LOG(ERROR) << "Failed to read the directory " << file_path
               << std::filesystem::exists(file_path) << ":"
               << std::filesystem::is_directory(file_path);
    return absl::FailedPreconditionError("Failed to find the directory");
  }
  std::vector<std::string> files;
  for (const auto& entry : std::filesystem::directory_iterator(file_path)) {
    files.push_back(entry.path().filename());
  }
  return files;
}

absl::Status PersistentStorageManagerImpl::deleteAllFilesExceptLargest(
    absl::string_view file_path) {
  auto files = listFiles(file_path);
  if (!files.ok()) {
    return files.status();
  }
  auto largest = findLargestFileId(files.value());
  if (!largest.ok()) {
    return largest.status();
  }

  std::string largest_file_name =
      MakeFileName(kfileNamePrefix, largest.value());
  for (const auto& entry : files.value()) {
    if (entry == largest_file_name) {
      continue;
    }
    std::string del_path = absl::StrCat(file_path, entry);
    if (!std::filesystem::remove(del_path)) {
      LOG(ERROR) << "Failed to delete the file " << del_path;
    }
  }
  return absl::OkStatus();
}

std::string PersistentStorageManagerImpl::MakeFileName(
    absl::string_view file_path, uint64_t file_id) {
  return absl::StrCat(file_path, "/", kfileNamePrefix, "_",
                      std::to_string(file_id));
}
}  // namespace persistent_storage
