| #include "action_context.h" |
| |
| #include <functional> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| |
| #include "google/protobuf/timestamp.pb.h" |
| #include "condition.h" |
| #include "convert_proto.h" |
| #include "daemon_context.h" |
| #include "safepower_agent.pb.h" |
| #include "state_persistence.pb.h" |
| #include "state_updater.h" |
| #include "absl/base/nullability.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/functional/bind_front.h" |
| #include "absl/log/check.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/str_format.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/time.h" |
| #include "bmc/status_macros.h" |
| |
| namespace safepower_agent { |
| |
| using ::safepower_agent_persistence_proto::SavedActions; |
| using ::safepower_agent_proto::ActionState; |
| using ::safepower_agent_proto::ActionStateChange; |
| using ::safepower_agent_proto::ActionStateLog; |
| |
| ActionStateLog ActionContext::NewInitialState() { |
| ActionStateLog initial_state; |
| initial_state.set_epoch_ms(DaemonContext::Get().epoch_ms()); |
| initial_state.set_current_state(safepower_agent_proto::ACTION_STATE_INIT); |
| SetTimestampToNow(*initial_state.add_history()->mutable_changed_at()); |
| return initial_state; |
| } |
| |
| static absl::Time GetStartTime( |
| const safepower_agent_proto::ActionStateLog& state) { |
| if (state.history_size() == 0) [[unlikely]] { |
| LOG(DFATAL) << "No start time in initial state"; |
| return DaemonContext::Get().now(); |
| } |
| return ConvertTime(state.history(0).changed_at()); |
| } |
| |
| ActionContext::ActionContext(CreationToken /*token*/, |
| ActionContextManager& manager, |
| std::string action_id, |
| safepower_agent_proto::StartActionRequest request, |
| Action action_impl, ActionStateLog initial_state) |
| |
| : manager_(manager), |
| request_(std::move(request)), |
| action_impl_(std::move(action_impl)), |
| precondition_( |
| request_.has_precondition() |
| ? std::make_optional<Condition>(request_.precondition(), |
| GetStartTime(initial_state)) |
| : std::nullopt), |
| validation_(request_.has_validation() |
| ? std::make_optional<Condition>( |
| request_.validation(), GetStartTime(initial_state)) |
| : std::nullopt), |
| action_state_updater_( |
| std::make_shared<StateUpdater<ActionStateLog>>(initial_state)), |
| action_id_(std::move(action_id)) {} |
| |
| absl::Status ActionContext::StartCheckingCondition( |
| Condition& condition, |
| absl::AnyInvocable<void(absl::Status, Condition::MatchList)> callback) { |
| absl::Status status = condition.WaitForMatch(execution_task_name(), |
| manager_.system_state_updater(), |
| std::move(callback)); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to wait for condition for " << action_id_ << ": " |
| << status; |
| ActionStateChange change; |
| SetStatus(*change.mutable_status(), status); |
| SetState(ActionState::ACTION_STATE_ERROR, std::move(change)); |
| Finish(std::move(action_impl_)); |
| return status; |
| } |
| return absl::OkStatus(); |
| } |
| |
| absl::Status ActionContext::Activate() { |
| absl::MutexLock lock(&mutex_); |
| |
| switch (action_state_updater_->state().current_state()) { |
| default: |
| return absl::InvalidArgumentError( |
| absl::StrCat("Action state is unknown:", |
| safepower_agent_proto::ActionState_Name( |
| action_state_updater_->state().current_state()))); |
| case safepower_agent_proto::ACTION_STATE_INIT: { |
| RETURN_IF_ERROR(EnterStateInit()); |
| break; |
| } |
| case safepower_agent_proto::ACTION_STATE_CHECKING_PRECONDITION: |
| if (precondition_.has_value()) { |
| return EnterStateCheckingPrecondition(); |
| } |
| |
| LOG(WARNING) |
| << action_id_ |
| << " has no precondition but is in checking precondition state"; |
| SetState(ActionState::ACTION_STATE_RUNNING_ACTION); |
| |
| break; |
| case safepower_agent_proto::ACTION_STATE_RUNNING_ACTION: |
| return EnterStateRunningAction(); |
| case safepower_agent_proto::ACTION_STATE_VALIDATING_FINAL_STATE: { |
| if (validation_.has_value()) { |
| return EnterStateValidatingFinalState(); |
| } |
| |
| ActionStateChange change; |
| SetStatus(*change.mutable_status(), |
| absl::InvalidArgumentError( |
| "No validation specified in validating state")); |
| LOG(WARNING) << action_id_ |
| << " has no validation but is in validating state"; |
| SetState(ActionState::ACTION_STATE_ERROR, std::move(change)); |
| break; |
| } |
| case safepower_agent_proto::ACTION_STATE_ERROR: |
| case safepower_agent_proto::ACTION_STATE_SUCCESS: |
| break; |
| } |
| |
| return absl::OkStatus(); |
| } |
| |
| ActionContext::~ActionContext() { |
| // Always try to cancel. This will fail if the action is already finished or |
| // has not started yet, which is fine. |
| do { |
| absl::Status cancel_status = |
| DaemonContext::Get().scheduler().CancelCall(execution_task_name()); |
| if (!cancel_status.ok() && !absl::IsNotFound(cancel_status)) { |
| LOG(DFATAL) << "Failed to cancel action " << action_id_ << ": " |
| << cancel_status; |
| continue; // Try again if cancel failed. This is not expected. |
| } |
| } while (false); |
| if (!mutex_.TryLock()) { |
| LOG(DFATAL) << "Action " << action_id_ << " is still running"; |
| return; |
| } |
| Action action_impl = std::move(action_impl_); |
| mutex_.Unlock(); |
| Finish(std::move(action_impl)); |
| } |
| |
| void ActionContext::Finish(Action action_impl) { |
| if (action_impl != nullptr) { |
| manager_.FinishAction(request_.action(), std::move(action_impl)); |
| } |
| } |
| |
| static void AssignConditions(ActionStateChange& change, |
| const Condition::MatchList& matches) { |
| for (absl::Nonnull<const safepower_agent_proto::Condition*> condition : |
| matches) { |
| *change.add_matching_conditions() = *condition; |
| } |
| } |
| |
| void ActionContext::SetState(safepower_agent_proto::ActionState new_state, |
| ActionStateChange change_info) { |
| using safepower_agent_proto::ActionStateChange; |
| LOG(INFO) << "SetState " << action_id_ << " " |
| << safepower_agent_proto::ActionState_Name(new_state) << ": " |
| << change_info; |
| ActionStateLog state_change; |
| state_change.set_epoch_ms(DaemonContext::Get().epoch_ms()); |
| state_change.set_current_state(new_state); |
| ActionStateChange* history_item = state_change.add_history(); |
| *history_item = std::move(change_info); |
| SetTimestampToNow(*history_item->mutable_changed_at()); |
| history_item->set_previous_state( |
| action_state_updater_->state().current_state()); |
| |
| bool is_final = false; |
| absl::Status status; |
| switch (new_state) { |
| default: |
| LOG(DFATAL) << "Unexpected state: " |
| << safepower_agent_proto::ActionState_Name(new_state); |
| break; |
| case safepower_agent_proto::ACTION_STATE_CHECKING_PRECONDITION: |
| status = EnterStateCheckingPrecondition(); |
| break; |
| case safepower_agent_proto::ACTION_STATE_RUNNING_ACTION: |
| status = EnterStateRunningAction(); |
| break; |
| case safepower_agent_proto::ACTION_STATE_VALIDATING_FINAL_STATE: |
| status = EnterStateValidatingFinalState(); |
| break; |
| case safepower_agent_proto::ACTION_STATE_SUCCESS: |
| case safepower_agent_proto::ACTION_STATE_ERROR: |
| status = DaemonContext::Get().scheduler().DelayCall( |
| [this, action_impl = std::move(action_impl_)]() mutable { |
| Finish(std::move(action_impl)); |
| }, |
| absl::ZeroDuration(), execution_task_name()); |
| is_final = true; |
| break; |
| } |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to enter state " |
| << safepower_agent_proto::ActionState_Name(new_state) << ": " |
| << status; |
| ActionStateChange error_change; |
| SetStatus(*error_change.mutable_status(), status); |
| if (action_state_updater_->state().current_state() == |
| ActionState::ACTION_STATE_ERROR && |
| new_state == ActionState::ACTION_STATE_ERROR) { |
| LOG(DFATAL) << "State change failed too many times"; |
| return; |
| } |
| SetState(ActionState::ACTION_STATE_ERROR, std::move(error_change)); |
| return; |
| } |
| action_state_updater_->UpdateState(state_change, is_final); |
| |
| SavedActions saved_actions; |
| auto [it, inserted] = |
| saved_actions.mutable_actions()->insert({action_id_, {}}); |
| CHECK(inserted); |
| *it->second.mutable_action_state_log() = std::move(state_change); |
| absl::Status write_status = |
| DaemonContext::Get().persistent_storage_manager().WriteSavedActionsChange( |
| saved_actions); |
| if (!write_status.ok()) { |
| LOG(ERROR) << "Failed to persist action state: " << write_status; |
| } |
| } |
| |
| absl::Status ActionContext::EnterStateInit() { |
| SavedActions saved_actions; |
| auto [it, inserted] = |
| saved_actions.mutable_actions()->insert({action_id_, {}}); |
| CHECK(inserted); |
| *it->second.mutable_original_request() = request_; |
| *it->second.mutable_action_state_log() = action_state_updater_->state(); |
| RETURN_IF_ERROR( |
| DaemonContext::Get().persistent_storage_manager().WriteSavedActionsChange( |
| saved_actions)); |
| return DaemonContext::Get().scheduler().DelayCall( |
| std::bind(&ActionContext::NextStateInit, this), absl::ZeroDuration(), |
| execution_task_name()); |
| } |
| |
| void ActionContext::NextStateInit() { |
| absl::MutexLock lock(&mutex_); |
| if (precondition_.has_value()) { |
| SetState(ActionState::ACTION_STATE_CHECKING_PRECONDITION); |
| } else { |
| SetState(ActionState::ACTION_STATE_RUNNING_ACTION); |
| } |
| } |
| |
| absl::Status ActionContext::EnterStateCheckingPrecondition() { |
| return StartCheckingCondition( |
| *precondition_, |
| absl::bind_front(&ActionContext::NextStatePreconditionMatched, this)); |
| } |
| |
| void ActionContext::NextStatePreconditionMatched(absl::Status status, |
| Condition::MatchList matches) { |
| absl::MutexLock lock(&mutex_); |
| ActionStateChange change; |
| AssignConditions(change, matches); |
| if (!status.ok()) { |
| LOG(ERROR) << "Precondition failed: " << status; |
| SetStatus(*change.mutable_status(), status); |
| SetState(safepower_agent_proto::ACTION_STATE_ERROR, std::move(change)); |
| return; |
| } |
| SetState(ActionState::ACTION_STATE_RUNNING_ACTION, std::move(change)); |
| } |
| |
| absl::Status ActionContext::EnterStateRunningAction() { |
| if (validation_.has_value()) { |
| auto [match_status, match_list] = |
| validation_->CheckState(manager_.system_state_updater()->state()); |
| if (!match_status.ok() || !match_list.empty()) { |
| return DaemonContext::Get().scheduler().DelayCall( |
| std::bind(&ActionContext::NextStateValidationCompleted, this, |
| match_status, std::move(match_list)), |
| absl::ZeroDuration(), execution_task_name()); |
| } |
| } |
| |
| return DaemonContext::Get().scheduler().DelayCall( |
| std::bind(&ActionContext::RunAction, this), absl::ZeroDuration(), |
| execution_task_name()); |
| } |
| |
| void ActionContext::RunAction() { |
| absl::MutexLock lock(&mutex_); |
| action_impl_(request_.action(), [this](absl::Status status) { |
| // NextStateActionRan must be called asynchronously to avoid deadlocks. |
| absl::Status delay_status = DaemonContext::Get().scheduler().DelayCall( |
| std::bind(&ActionContext::NextStateActionRan, this, status), |
| absl::ZeroDuration(), execution_task_name()); |
| if (!delay_status.ok()) { |
| LOG(DFATAL) << "Failed to delay next state: " << delay_status; |
| NextStateActionRan(status); |
| } |
| }); |
| } |
| |
| void ActionContext::NextStateActionRan(absl::Status status) { |
| absl::MutexLock lock(&mutex_); |
| if (!status.ok()) { |
| LOG(ERROR) << "Action failed: " << status; |
| ActionStateChange change; |
| SetStatus(*change.mutable_status(), status); |
| SetState(ActionState::ACTION_STATE_ERROR, std::move(change)); |
| return; |
| } |
| if (validation_.has_value()) { |
| SetState(ActionState::ACTION_STATE_VALIDATING_FINAL_STATE); |
| } else { |
| SetState(ActionState::ACTION_STATE_SUCCESS); |
| } |
| } |
| |
| absl::Status ActionContext::EnterStateValidatingFinalState() { |
| return StartCheckingCondition( |
| *validation_, |
| absl::bind_front(&ActionContext::NextStateValidationCompleted, this)); |
| } |
| |
| void ActionContext::NextStateValidationCompleted(absl::Status status, |
| Condition::MatchList matches) { |
| absl::MutexLock lock(&mutex_); |
| ActionStateChange change; |
| AssignConditions(change, matches); |
| if (!status.ok()) { |
| LOG(ERROR) << "Validation failed: " << status; |
| SetStatus(*change.mutable_status(), status); |
| SetState(ActionState::ACTION_STATE_ERROR, std::move(change)); |
| return; |
| } |
| SetState(ActionState::ACTION_STATE_SUCCESS, std::move(change)); |
| } |
| |
| absl::Status ActionContextManager::RegisterAction( |
| const safepower_agent_proto::Action& action, |
| ActionContext::Action action_impl) { |
| absl::MutexLock lock(&actions_mutex_); |
| auto [_, inserted] = actions_.try_emplace(action, std::move(action_impl)); |
| if (!inserted) { |
| return absl::AlreadyExistsError( |
| absl::StrCat("Action already registered: ", action)); |
| } |
| return absl::OkStatus(); |
| } |
| |
| absl::StatusOr<absl::Nonnull<std::unique_ptr<ActionContext>>> |
| ActionContextManager::ReloadActionContext( |
| std::string action_id, |
| safepower_agent_persistence_proto::SavedAction saved_action) { |
| Action action_impl = nullptr; |
| if (!ActionContext::IsFinalState( |
| saved_action.action_state_log().current_state())) { |
| ASSIGN_OR_RETURN(action_impl, |
| ReserveAction(saved_action.original_request().action())); |
| } |
| |
| auto action_context = std::make_unique<ActionContext>( |
| ActionContext::CreationToken{}, *this, std::move(action_id), |
| std::move(*saved_action.mutable_original_request()), |
| std::move(action_impl), |
| std::move(*saved_action.mutable_action_state_log())); |
| RETURN_IF_ERROR(action_context->Activate()); |
| return action_context; |
| } |
| |
| absl::Status ActionContextManager::LoadSavedActions() { |
| ASSIGN_OR_RETURN( |
| SavedActions saved_actions, |
| DaemonContext::Get().persistent_storage_manager().ReadSavedActions()); |
| absl::MutexLock lock(&actions_mutex_); |
| |
| for (auto& [action_id, saved_action] : *saved_actions.mutable_actions()) { |
| absl::StatusOr<std::unique_ptr<ActionContext>> action_context_or = |
| ReloadActionContext(action_id, std::move(saved_action)); |
| if (!action_context_or.ok()) { |
| LOG(ERROR) << "Failed to reload action: " << action_context_or.status(); |
| continue; |
| } |
| std::unique_ptr<ActionContext> action_context = |
| *std::move(action_context_or); |
| absl::string_view action_id_ref = action_context->action_id(); |
| auto [it, inserted] = |
| running_actions_.try_emplace(action_id_ref, std::move(action_context)); |
| if (!inserted) { |
| return absl::AlreadyExistsError( |
| absl::StrCat("Duplicate action ID: ", action_context->action_id())); |
| } |
| } |
| return absl::OkStatus(); |
| } |
| |
| absl::StatusOr<ActionContext::Action> ActionContextManager::ReserveAction( |
| const safepower_agent_proto::Action& action) { |
| auto found_action_impl = actions_.find(action); |
| if (found_action_impl == actions_.end()) { |
| return absl::NotFoundError(absl::StrCat("Action not found: ", action)); |
| } |
| if (found_action_impl->second == nullptr) { |
| return absl::FailedPreconditionError( |
| absl::StrCat("Action already started: ", action)); |
| } |
| return std::move(found_action_impl->second); |
| } |
| |
| std::string ActionContextManager::NextActionId() { |
| return absl::StrFormat("%u-%u", DaemonContext::Get().epoch_ms(), |
| next_action_id_++); |
| } |
| |
| absl::StatusOr<absl::Nonnull<ActionContext*>> ActionContextManager::StartAction( |
| safepower_agent_proto::StartActionRequest request) { |
| absl::MutexLock lock(&actions_mutex_); |
| ASSIGN_OR_RETURN(ActionContext::Action action_impl, |
| ReserveAction(request.action())); |
| auto action_context = std::make_unique<ActionContext>( |
| ActionContext::CreationToken{}, *this, NextActionId(), std::move(request), |
| std::move(action_impl)); |
| RETURN_IF_ERROR(action_context->Activate()); |
| |
| absl::string_view action_id = action_context->action_id(); |
| auto [it, inserted] = |
| running_actions_.try_emplace(action_id, std::move(action_context)); |
| if (!inserted) { |
| return absl::AlreadyExistsError( |
| absl::StrCat("Duplicate action ID: ", action_context->action_id())); |
| } |
| return it->second.get(); |
| } |
| |
| void ActionContextManager::FinishAction( |
| const safepower_agent_proto::Action& action, |
| ActionContext::Action action_impl) { |
| absl::MutexLock lock(&actions_mutex_); |
| auto found_action_impl = actions_.find(action); |
| if (found_action_impl == actions_.end()) { |
| LOG(DFATAL) << "Action not found: " << action; |
| return; |
| } |
| if (found_action_impl->second != nullptr) { |
| LOG(DFATAL) << "Action not started: " << action; |
| return; |
| } |
| found_action_impl->second = std::move(action_impl); |
| } |
| |
| absl::Nullable<ActionContext*> ActionContextManager::GetActionContext( |
| absl::string_view action_id) { |
| absl::MutexLock lock(&actions_mutex_); |
| auto found_action_context = running_actions_.find(action_id); |
| if (found_action_context == running_actions_.end()) { |
| return nullptr; |
| } |
| return found_action_context->second.get(); |
| } |
| |
| void ActionContextManager::GetSupportedActions( |
| safepower_agent_proto::GetSupportedActionsResponse& response) const { |
| absl::MutexLock lock(&actions_mutex_); |
| for (const auto& [action, _] : actions_) { |
| response.add_actions()->CopyFrom(action); |
| } |
| } |
| |
| } // namespace safepower_agent |