| #include "disruption_manager.h" |
| |
| #include <iterator> |
| #include <list> |
| #include <utility> |
| |
| #include "daemon_context.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/check.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "source_location" |
| #include "bmc/status_macros.h" |
| |
| namespace safepower_agent { |
| |
| static constexpr absl::string_view kEndCallbacksTaskName = |
| "disruption_end_callbacks"; |
| static constexpr absl::string_view kStartCallbacksTaskName = |
| "disruption_start_callbacks"; |
| |
| absl::Status DisruptionManager::RunCallbacks( |
| std::list<DisruptionManager::DisruptionCallback>& list, |
| absl::Duration delay, std::source_location location) { |
| absl::string_view name; |
| if (&list == &disruption_start_callbacks_) { |
| name = kStartCallbacksTaskName; |
| } else if (&list == &disruption_end_callbacks_) { |
| name = kEndCallbacksTaskName; |
| } else { |
| return absl::InternalError("Unknown list"); |
| } |
| return DaemonContext::Get().scheduler().DelayCall( |
| [this, &list, location = std::move(location), name] { |
| absl::MutexLock lock(disruption_mutex_); |
| LOG(INFO).AtLocation(location.file_name(), location.line()) |
| << "Running " << name; |
| for (auto& callback : list) { |
| if (callback) { |
| std::move(callback)(); |
| } |
| } |
| LOG(INFO).AtLocation(location.file_name(), location.line()) |
| << "Done running " << name; |
| }, |
| delay, name); |
| } |
| |
| absl::Status DisruptionManager::CancelDisruption() { |
| absl::MutexLock lock(disruption_mutex_); |
| if (DaemonContext::Get().now() >= expect_disruption_until_) { |
| return absl::FailedPreconditionError("Not expecting a disruption"); |
| } |
| if (absl::Status status = |
| DaemonContext::Get().scheduler().CancelCall(kEndCallbacksTaskName); |
| !status.ok()) { |
| LOG(DFATAL) << "Failed to cancel disruption end callbacks: " << status; |
| } else if (absl::Status status = |
| RunCallbacks(disruption_end_callbacks_, absl::ZeroDuration()); |
| !status.ok()) { |
| LOG(DFATAL) << "Failed to run disruption end callbacks: " << status; |
| } |
| LOG(INFO) << "Disruption canceled"; |
| expect_disruption_until_ = absl::InfinitePast(); |
| return absl::OkStatus(); |
| } |
| |
| absl::Status DisruptionManager::ExpectDisruptionIn(absl::Duration timeout) { |
| absl::MutexLock lock(disruption_mutex_); |
| if (expect_disruption_until_ >= DaemonContext::Get().now()) { |
| return absl::FailedPreconditionError( |
| "Disruption already expected by " + |
| absl::FormatTime(expect_disruption_until_)); |
| } |
| RETURN_IF_ERROR(RunCallbacks(disruption_end_callbacks_, timeout)); |
| absl::Status status = |
| RunCallbacks(disruption_start_callbacks_, absl::ZeroDuration()); |
| if (!status.ok()) { |
| absl::Status cancel_status = |
| DaemonContext::Get().scheduler().CancelCall(kStartCallbacksTaskName); |
| if (!cancel_status.ok()) { |
| LOG(DFATAL) << "Failed to cancel disruption start callbacks: " |
| << cancel_status; |
| } |
| return status; |
| } |
| expect_disruption_until_ = DaemonContext::Get().now() + timeout; |
| LOG(INFO) << "Disruption expected by " |
| << absl::FormatTime(expect_disruption_until_); |
| return absl::OkStatus(); |
| } |
| |
| absl::Duration DisruptionManager::PendingDisruption() const { |
| absl::MutexLock lock(disruption_mutex_); |
| absl::Time now = DaemonContext::Get().now(); |
| if (expect_disruption_until_ > now) { |
| return expect_disruption_until_ - now; |
| } |
| return absl::ZeroDuration(); |
| } |
| |
| DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionStart( |
| DisruptionCallback callback) { |
| absl::MutexLock lock(disruption_mutex_); |
| return OnDisruptionStartHelper(std::move(callback)); |
| } |
| |
| DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionEnd( |
| DisruptionCallback callback) { |
| absl::MutexLock lock(disruption_mutex_); |
| return OnDisruptionEndHelper(std::move(callback)); |
| } |
| |
| DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionStartHelper( |
| DisruptionCallback callback) { |
| disruption_start_callbacks_.push_back(std::move(callback)); |
| return CallbackHandle({}, disruption_mutex_, disruption_start_callbacks_, |
| std::prev(disruption_start_callbacks_.end())); |
| } |
| |
| DisruptionManager::CallbackHandle DisruptionManager::OnDisruptionEndHelper( |
| DisruptionCallback callback) { |
| disruption_end_callbacks_.push_front(std::move(callback)); |
| return CallbackHandle({}, disruption_mutex_, disruption_end_callbacks_, |
| disruption_end_callbacks_.begin()); |
| } |
| |
| DisruptionManager::CallbackHandle::CallbackHandle( |
| PrivateToken, absl::Mutex& mutex, std::list<DisruptionCallback>& list, |
| std::list<DisruptionCallback>::iterator it) |
| : mutex_(&mutex), list_(&list), it_(it) {} |
| |
| DisruptionManager::CallbackHandle::~CallbackHandle() { |
| if (list_ == nullptr) return; |
| CHECK_NE(mutex_, nullptr); |
| absl::MutexLock lock(*mutex_); |
| list_->erase(it_); |
| } |
| |
| DisruptionManager::CallbackHandle::CallbackHandle(CallbackHandle&& other) { |
| mutex_ = other.mutex_; |
| list_ = other.list_; |
| it_ = other.it_; |
| other.mutex_ = nullptr; |
| other.list_ = nullptr; |
| } |
| |
| DisruptionManager::CallbackHandle& DisruptionManager::CallbackHandle::operator=( |
| CallbackHandle&& other) { |
| mutex_ = other.mutex_; |
| list_ = other.list_; |
| it_ = other.it_; |
| other.mutex_ = nullptr; |
| other.list_ = nullptr; |
| return *this; |
| } |
| |
| } // namespace safepower_agent |