blob: b6e248c269a5ae95c5d8701a7c21976dad8fa38f [file] [log] [blame] [edit]
#include "disruption_manager.h"
#include <iterator>
#include <list>
#include <utility>
#include "daemon_context.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "source_location"
#include "bmc/status_macros.h"
namespace safepower_agent {
static constexpr absl::string_view kEndCallbacksTaskName =
"disruption_end_callbacks";
static constexpr absl::string_view kStartCallbacksTaskName =
"disruption_start_callbacks";
absl::Status DisruptionManager::RunCallbacks(
std::list<DisruptionManager::DisruptionCallback>& list,
absl::Duration delay, std::source_location location) {
absl::string_view name;
if (&list == &disruption_start_callbacks_) {
name = kStartCallbacksTaskName;
} else if (&list == &disruption_end_callbacks_) {
name = kEndCallbacksTaskName;
} else {
return absl::InternalError("Unknown list");
}
return DaemonContext::Get().scheduler().DelayCall(
[this, &list, location = std::move(location), name] {
absl::MutexLock lock(disruption_mutex_);
LOG(INFO).AtLocation(location.file_name(), location.line())
<< "Running " << name;
for (auto& callback : list) {
if (callback) {
std::move(callback)();
}
}
LOG(INFO).AtLocation(location.file_name(), location.line())
<< "Done running " << name;
},
delay, name);
}
absl::Status DisruptionManager::CancelDisruption() {
absl::MutexLock lock(disruption_mutex_);
if (DaemonContext::Get().now() >= expect_disruption_until_) {
return absl::FailedPreconditionError("Not expecting a disruption");
}
if (absl::Status status =
DaemonContext::Get().scheduler().CancelCall(kEndCallbacksTaskName);
!status.ok()) {
LOG(DFATAL) << "Failed to cancel disruption end callbacks: " << status;
} else if (absl::Status status =
RunCallbacks(disruption_end_callbacks_, absl::ZeroDuration());
!status.ok()) {
LOG(DFATAL) << "Failed to run disruption end callbacks: " << status;
}
LOG(INFO) << "Disruption canceled";
expect_disruption_until_ = absl::InfinitePast();
return absl::OkStatus();
}
absl::Status DisruptionManager::ExpectDisruptionIn(absl::Duration timeout) {
absl::MutexLock lock(disruption_mutex_);
if (expect_disruption_until_ >= DaemonContext::Get().now()) {
return absl::FailedPreconditionError(
"Disruption already expected by " +
absl::FormatTime(expect_disruption_until_));
}
RETURN_IF_ERROR(RunCallbacks(disruption_end_callbacks_, timeout));
absl::Status status =
RunCallbacks(disruption_start_callbacks_, absl::ZeroDuration());
if (!status.ok()) {
absl::Status cancel_status =
DaemonContext::Get().scheduler().CancelCall(kStartCallbacksTaskName);
if (!cancel_status.ok()) {
LOG(DFATAL) << "Failed to cancel disruption start callbacks: "
<< cancel_status;
}
return status;
}
expect_disruption_until_ = DaemonContext::Get().now() + timeout;
LOG(INFO) << "Disruption expected by "
<< absl::FormatTime(expect_disruption_until_);
return absl::OkStatus();
}
absl::Duration DisruptionManager::PendingDisruption() const {
absl::MutexLock lock(disruption_mutex_);
absl::Time now = DaemonContext::Get().now();
if (expect_disruption_until_ > now) {
return expect_disruption_until_ - now;
}
return absl::ZeroDuration();
}
DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionStart(
DisruptionCallback callback) {
absl::MutexLock lock(disruption_mutex_);
return OnDisruptionStartHelper(std::move(callback));
}
DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionEnd(
DisruptionCallback callback) {
absl::MutexLock lock(disruption_mutex_);
return OnDisruptionEndHelper(std::move(callback));
}
DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionStartHelper(
DisruptionCallback callback) {
disruption_start_callbacks_.push_back(std::move(callback));
return CallbackHandle({}, disruption_mutex_, disruption_start_callbacks_,
std::prev(disruption_start_callbacks_.end()));
}
DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionEndHelper(
DisruptionCallback callback) {
disruption_end_callbacks_.push_front(std::move(callback));
return CallbackHandle({}, disruption_mutex_, disruption_end_callbacks_,
disruption_end_callbacks_.begin());
}
DisruptionManager::CallbackHandle::CallbackHandle(
PrivateToken, absl::Mutex& mutex, std::list<DisruptionCallback>& list,
std::list<DisruptionCallback>::iterator it)
: mutex_(&mutex), list_(&list), it_(it) {}
DisruptionManager::CallbackHandle::~CallbackHandle() {
if (list_ == nullptr) return;
CHECK_NE(mutex_, nullptr);
absl::MutexLock lock(*mutex_);
list_->erase(it_);
}
DisruptionManager::CallbackHandle::CallbackHandle(CallbackHandle&& other) {
mutex_ = other.mutex_;
list_ = other.list_;
it_ = other.it_;
other.mutex_ = nullptr;
other.list_ = nullptr;
}
DisruptionManager::CallbackHandle& DisruptionManager::CallbackHandle::operator=(
CallbackHandle&& other) {
mutex_ = other.mutex_;
list_ = other.list_;
it_ = other.it_;
other.mutex_ = nullptr;
other.list_ = nullptr;
return *this;
}
} // namespace safepower_agent