| #ifndef PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_ACTION_CONTEXT_H_ |
| #define PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_ACTION_CONTEXT_H_ |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| |
| #include "action_hash.h" |
| #include "condition.h" |
| #include "safepower_agent.pb.h" |
| #include "state_persistence.pb.h" |
| #include "state_updater.h" |
| #include "utils.pb.h" |
| #include "absl/base/nullability.h" |
| #include "absl/base/thread_annotations.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/hash/hash.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| |
| namespace safepower_agent { |
| class ActionContext; |
| |
| // Used to start and manage actions. Handles retrieving persistent state and |
| // scheduling new actions. |
| class ActionContextManager { |
| public: |
| // Action is a function that is called when the action is ready to be |
| // executed. |
| // The first argument is the proto representation of the action. |
| // The second argument is a callback function that is called when the action |
| // is finished. The callback takes a single argument: the status of the |
| // action. |
| // It is worth noting that actions are non-blocking. To allow for this, the |
| // callback function is passed by value. This allows the caller to invoke the |
| // callback at a later time. |
| using Action = |
| absl::AnyInvocable<void(const safepower_agent_proto::Action&, |
| absl::AnyInvocable<void(absl::Status) &&>)>; |
| |
| explicit ActionContextManager( |
| std::shared_ptr<StateUpdater<safepower_agent_proto::SystemState>> |
| state_updater, |
| std::shared_ptr<StateUpdater<safepower_agent_proto::SystemState>> |
| global_state_updater) |
| : state_updater_(std::move(state_updater)), |
| global_state_updater_(std::move(global_state_updater)) {} |
| ActionContextManager(ActionContextManager&& other) = delete; |
| ActionContextManager& operator=(ActionContextManager&& other) = delete; |
| |
| absl::Status RegisterAction(const safepower_agent_proto::Action& action, |
| Action action_impl) |
| ABSL_LOCKS_EXCLUDED(actions_mutex_); |
| |
| absl::Status LoadSavedActions(); |
| |
| absl::StatusOr<ActionContext* > StartAction( |
| safepower_agent_proto::StartActionRequest request) |
| ABSL_LOCKS_EXCLUDED(actions_mutex_); |
| void FinishAction(const safepower_agent_proto::Action& action, |
| Action action_impl) ABSL_LOCKS_EXCLUDED(actions_mutex_); |
| |
| ActionContext* GetActionContext( |
| const borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest& |
| flight_record) |
| ABSL_LOCKS_EXCLUDED(actions_mutex_); |
| |
| void GetSupportedActions( |
| safepower_agent_proto::GetSupportedActionsResponse& response) const |
| ABSL_LOCKS_EXCLUDED(actions_mutex_); |
| |
| const |
| std::shared_ptr<StateUpdater<safepower_agent_proto::SystemState>>& |
| system_state_updater() const { |
| return state_updater_; |
| } |
| |
| const |
| std::shared_ptr<StateUpdater<safepower_agent_proto::SystemState>>& |
| global_system_state_updater() const { |
| return global_state_updater_; |
| } |
| |
| // This is just for use by mutex annotations. Since it return const, it |
| // can't be used to acquire the mutex. |
| const absl::Mutex& actions_mutex() const { return actions_mutex_; } |
| |
| absl::Status validateFlightRecord( |
| const borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest& record); |
| |
| private: |
| absl::StatusOr<Action> ReserveAction( |
| const safepower_agent_proto::Action& action) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(actions_mutex_); |
| |
| absl::StatusOr< std::unique_ptr<ActionContext>> |
| ReloadActionContext( |
| safepower_agent_persistence_proto::SavedAction saved_action) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(actions_mutex_); |
| |
| std::shared_ptr<StateUpdater<safepower_agent_proto::SystemState>> |
| state_updater_; |
| std::shared_ptr<StateUpdater<safepower_agent_proto::SystemState>> |
| global_state_updater_; |
| |
| mutable absl::Mutex actions_mutex_; |
| absl::flat_hash_map<safepower_agent_proto::Action, Action, ActionHash, |
| ActionEq> |
| actions_ ABSL_GUARDED_BY(actions_mutex_); |
| // ActionContext owns the key. |
| |
| struct FlightRecordHash { |
| std::size_t operator()( |
| const borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest& |
| flight_record) const { |
| return absl::HashOf(std::make_pair(flight_record.flight_name(), |
| flight_record.step_id())); |
| } |
| }; |
| struct FlightRecordEq { |
| bool operator()( |
| const borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest& lhs, |
| const borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest& rhs) |
| const { |
| return lhs.flight_name() == rhs.flight_name() && |
| lhs.step_id() == rhs.step_id(); |
| } |
| }; |
| |
| absl::flat_hash_map< |
| borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest, |
| std::unique_ptr<ActionContext>, FlightRecordHash, |
| FlightRecordEq> |
| running_actions_ ABSL_GUARDED_BY(actions_mutex_); |
| }; |
| |
| // Represents a single action execution. |
| class ActionContext { |
| constexpr static int kMaxStateChangeRetryCount = 4; |
| |
| public: |
| // Only allow ActionContext to be created by ActionContextManager. |
| struct CreationToken { |
| private: |
| CreationToken() = default; |
| friend class ActionContextManager; |
| }; |
| |
| using Action = ActionContextManager::Action; |
| |
| explicit ActionContext( |
| CreationToken token, ActionContextManager& manager, |
| safepower_agent_proto::StartActionRequest request, |
| Action action_impl = {}, |
| safepower_agent_proto::ActionStateLog initial_state = NewInitialState()); |
| ~ActionContext(); |
| |
| ActionContext(const ActionContext& other) = delete; |
| ActionContext& operator=(const ActionContext& other) = delete; |
| |
| const borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest& |
| flight_record() const { |
| return request_.flight_record(); |
| } |
| const safepower_agent_proto::StartActionRequest& request() const { |
| return request_; |
| } |
| |
| |
| std::shared_ptr<StateUpdater<safepower_agent_proto::ActionStateLog>> |
| action_state_updater() const { |
| return action_state_updater_; |
| } |
| |
| absl::Status Activate(); |
| |
| static bool IsFinalState(safepower_agent_proto::ActionState state) { |
| return state == safepower_agent_proto::ACTION_STATE_SUCCESS || |
| state == safepower_agent_proto::ACTION_STATE_ERROR; |
| } |
| |
| private: |
| std::optional<absl::Status> CheckValidation( |
| const safepower_agent_proto::SystemState& state); |
| |
| absl::Status EnterStateInit() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); |
| absl::Status EnterStateRunningAction() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); |
| absl::Status EnterStateCheckingPrecondition() |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); |
| absl::Status EnterStateValidatingFinalState() |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); |
| |
| void NextStateInit() ABSL_LOCKS_EXCLUDED(mutex_); |
| void NextStatePreconditionMatched(absl::Status status, |
| Condition::MatchList matches) |
| ABSL_LOCKS_EXCLUDED(mutex_); |
| void NextStateActionRan(absl::Status status) ABSL_LOCKS_EXCLUDED(mutex_); |
| void NextStateValidationCompleted(absl::Status status, |
| Condition::MatchList matches) |
| ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| void RunAction() ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| absl::Status StartCheckingCondition( |
| Condition& condition, |
| absl::AnyInvocable<void(absl::Status, Condition::MatchList)> callback) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); |
| |
| std::string execution_task_name() const { |
| // The execution task name must be unique for each action. |
| // As the flight name and step can be any characters, we add a hash to |
| // The execution task name to ensure that the name is unique. |
| uint64_t hash = absl::HashOf(std::make_pair(flight_record().flight_name(), |
| flight_record().step_id())); |
| return absl::StrCat(flight_record().flight_name(), "-", |
| flight_record().step_id(), "-", hash, ".execution"); |
| } |
| |
| std::string response_task_name() const { |
| return absl::StrCat(execution_task_name(), ".response"); |
| } |
| |
| void SetState(safepower_agent_proto::ActionState new_state, |
| safepower_agent_proto::ActionStateChange change_info = {}) |
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_); |
| void Finish(Action action_impl); |
| |
| absl::Mutex mutex_ ABSL_ACQUIRED_AFTER(manager_.actions_mutex()); |
| ActionContextManager& manager_; |
| const safepower_agent_proto::StartActionRequest request_; |
| Action action_impl_ ABSL_GUARDED_BY(mutex_); |
| std::optional<Condition> precondition_; |
| std::optional<Condition> validation_; |
| |
| std::shared_ptr<StateUpdater<safepower_agent_proto::ActionStateLog>> |
| action_state_updater_; |
| static safepower_agent_proto::ActionStateLog NewInitialState(); |
| }; |
| |
| } // namespace safepower_agent |
| |
| #endif // PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_ACTION_CONTEXT_H_ |