blob: 6cdb554f89db3d543e2e356a08b152bc309c3367 [file] [log] [blame]
#include "action_context.h"
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include "google/protobuf/timestamp.pb.h"
#include "condition.h"
#include "convert_proto.h"
#include "daemon_context.h"
#include "safepower_agent.pb.h"
#include "state_persistence.pb.h"
#include "state_updater.h"
#include "utils.pb.h"
#include "absl/base/nullability.h"
#include "absl/functional/any_invocable.h"
#include "absl/functional/bind_front.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "bmc/status_macros.h"
namespace safepower_agent {
using borg_mgmt::node_proxy::safepower::utils::FlightRecordRequest;
using ::safepower_agent_persistence_proto::SavedActions;
using ::safepower_agent_proto::ActionState;
using ::safepower_agent_proto::ActionStateChange;
using ::safepower_agent_proto::ActionStateLog;
ActionStateLog ActionContext::NewInitialState() {
ActionStateLog initial_state;
initial_state.set_epoch_ms(DaemonContext::Get().epoch_ms());
initial_state.set_current_state(safepower_agent_proto::ACTION_STATE_INIT);
SetTimestampToNow(*initial_state.add_history()->mutable_changed_at());
return initial_state;
}
static absl::Time GetStartTime(
const safepower_agent_proto::ActionStateLog& state) {
if (state.history_size() == 0) [[unlikely]] {
LOG(DFATAL) << "No start time in initial state";
return DaemonContext::Get().now();
}
return ConvertTime(state.history(0).changed_at());
}
ActionContext::ActionContext(CreationToken /*token*/,
ActionContextManager& manager,
safepower_agent_proto::StartActionRequest request,
Action action_impl, ActionStateLog initial_state)
: manager_(manager),
request_(std::move(request)),
action_impl_(std::move(action_impl)),
precondition_(
request_.has_precondition()
? std::make_optional<Condition>(request_.precondition(),
GetStartTime(initial_state))
: std::nullopt),
validation_(request_.has_validation()
? std::make_optional<Condition>(
request_.validation(), GetStartTime(initial_state))
: std::nullopt),
action_state_updater_(
std::make_shared<StateUpdater<ActionStateLog>>(initial_state)) {}
absl::Status ActionContext::StartCheckingCondition(
Condition& condition,
absl::AnyInvocable<void(absl::Status, Condition::MatchList)> callback) {
absl::Status status = condition.WaitForMatch(
execution_task_name(), manager_.system_state_updater(),
manager_.global_system_state_updater(), std::move(callback));
if (!status.ok()) {
LOG(ERROR) << "Failed to wait for condition for "
<< flight_record().DebugString() << ": " << status;
ActionStateChange change;
SetStatus(*change.mutable_status(), status);
SetState(ActionState::ACTION_STATE_ERROR, std::move(change));
Finish(std::move(action_impl_));
return status;
}
return absl::OkStatus();
}
absl::Status ActionContext::Activate() {
absl::MutexLock lock(mutex_);
LOG(INFO) << "Activate " << flight_record().DebugString() << " in state "
<< safepower_agent_proto::ActionState_Name(
action_state_updater_->state().current_state());
switch (action_state_updater_->state().current_state()) {
default:
return absl::InvalidArgumentError(
absl::StrCat("Action state is unknown:",
safepower_agent_proto::ActionState_Name(
action_state_updater_->state().current_state())));
case safepower_agent_proto::ACTION_STATE_INIT: {
RETURN_IF_ERROR(EnterStateInit());
break;
}
case safepower_agent_proto::ACTION_STATE_CHECKING_PRECONDITION:
if (precondition_.has_value()) {
return EnterStateCheckingPrecondition();
}
LOG(WARNING)
<< flight_record().DebugString()
<< " has no precondition but is in checking precondition state";
SetState(ActionState::ACTION_STATE_RUNNING_ACTION);
break;
case safepower_agent_proto::ACTION_STATE_RUNNING_ACTION:
LOG(INFO) << flight_record().DebugString()
<< " resumed in running action state. Assuming the action was "
"already run successfully.";
if (validation_.has_value()) {
SetState(ActionState::ACTION_STATE_VALIDATING_FINAL_STATE);
} else {
SetState(ActionState::ACTION_STATE_SUCCESS);
}
break;
case safepower_agent_proto::ACTION_STATE_VALIDATING_FINAL_STATE: {
if (validation_.has_value()) {
return EnterStateValidatingFinalState();
}
ActionStateChange change;
SetStatus(*change.mutable_status(),
absl::InvalidArgumentError(
"No validation specified in validating state"));
LOG(WARNING) << flight_record().DebugString()
<< " has no validation but is in validating state";
SetState(ActionState::ACTION_STATE_ERROR, std::move(change));
break;
}
case safepower_agent_proto::ACTION_STATE_ERROR:
case safepower_agent_proto::ACTION_STATE_SUCCESS:
break;
}
return absl::OkStatus();
}
ActionContext::~ActionContext() {
// Always try to cancel. This will fail if the action is already finished or
// has not started yet, which is fine.
do {
// Try to cancel all tasks before checking status. So that as much is
// cancelled as possible.
absl::Status cancel_status =
DaemonContext::Get().scheduler().CancelCall(execution_task_name());
absl::Status response_cancel_status =
DaemonContext::Get().scheduler().CancelCall(response_task_name());
if (!cancel_status.ok() && !absl::IsNotFound(cancel_status)) {
LOG(DFATAL) << "Failed to cancel action " << flight_record().DebugString()
<< ": " << cancel_status;
continue; // Try again if cancel failed. This is not expected.
}
if (!response_cancel_status.ok() &&
!absl::IsNotFound(response_cancel_status)) {
LOG(DFATAL) << "Failed to cancel action response callback for "
<< execution_task_name() << ": " << response_cancel_status;
continue; // Try again if cancel failed. This is not expected.
}
} while (false);
if (!mutex_.try_lock()) {
LOG(DFATAL) << "Action " << flight_record().DebugString()
<< " is still running";
return;
}
Action action_impl = std::move(action_impl_);
mutex_.unlock();
Finish(std::move(action_impl));
}
void ActionContext::Finish(Action action_impl) {
if (action_impl != nullptr) {
manager_.FinishAction(request_.action(), std::move(action_impl));
}
}
static void AssignConditions(ActionStateChange& change,
const Condition::MatchList& matches) {
for (const safepower_agent_proto::Condition* condition :
matches) {
*change.add_matching_conditions() = *condition;
}
}
void ActionContext::SetState(safepower_agent_proto::ActionState new_state,
ActionStateChange change_info) {
using safepower_agent_proto::ActionStateChange;
LOG(INFO) << "SetState " << flight_record().DebugString() << " "
<< safepower_agent_proto::ActionState_Name(new_state) << ": "
<< change_info;
ActionStateLog state_change;
state_change.set_epoch_ms(DaemonContext::Get().epoch_ms());
state_change.set_current_state(new_state);
ActionStateChange* history_item = state_change.add_history();
*history_item = std::move(change_info);
SetTimestampToNow(*history_item->mutable_changed_at());
history_item->set_previous_state(
action_state_updater_->state().current_state());
bool is_final = false;
absl::Status status;
switch (new_state) {
default:
LOG(DFATAL) << "Unexpected state: "
<< safepower_agent_proto::ActionState_Name(new_state);
break;
case safepower_agent_proto::ACTION_STATE_CHECKING_PRECONDITION:
status = EnterStateCheckingPrecondition();
break;
case safepower_agent_proto::ACTION_STATE_RUNNING_ACTION:
status = EnterStateRunningAction();
break;
case safepower_agent_proto::ACTION_STATE_VALIDATING_FINAL_STATE:
status = EnterStateValidatingFinalState();
break;
case safepower_agent_proto::ACTION_STATE_SUCCESS:
case safepower_agent_proto::ACTION_STATE_ERROR:
status = DaemonContext::Get().scheduler().DelayCall(
[this, action_impl = std::exchange(action_impl_, nullptr)]() mutable {
Finish(std::move(action_impl));
},
absl::ZeroDuration(), execution_task_name());
is_final = true;
break;
}
if (!status.ok()) {
LOG(ERROR) << "Failed to enter state "
<< safepower_agent_proto::ActionState_Name(new_state) << ": "
<< status;
ActionStateChange error_change;
SetStatus(*error_change.mutable_status(), status);
if (action_state_updater_->state().current_state() ==
ActionState::ACTION_STATE_ERROR &&
new_state == ActionState::ACTION_STATE_ERROR) {
LOG(DFATAL) << "State change failed too many times";
return;
}
SetState(ActionState::ACTION_STATE_ERROR, std::move(error_change));
return;
}
action_state_updater_->UpdateState(state_change, is_final);
SavedActions saved_actions;
auto ptr_saved_action_record = saved_actions.add_saved_action_records();
*(ptr_saved_action_record->mutable_actions()
->mutable_original_request()
->mutable_flight_record()) = flight_record();
auto ptr_saved_action = ptr_saved_action_record->mutable_actions();
*(ptr_saved_action->mutable_action_state_log()) = std::move(state_change);
absl::Status write_status =
DaemonContext::Get().persistent_storage_manager().WriteSavedActionsChange(
saved_actions);
if (!write_status.ok()) {
LOG(ERROR) << "Failed to persist action state: " << write_status;
}
}
absl::Status ActionContext::EnterStateInit() {
SavedActions saved_actions;
safepower_agent_persistence_proto::SavedActionRecord*
ptr_saved_action_record = saved_actions.add_saved_action_records();
safepower_agent_persistence_proto::SavedAction* ptr_saved_action =
ptr_saved_action_record->mutable_actions();
*(ptr_saved_action->mutable_action_state_log()) =
action_state_updater_->state();
*(ptr_saved_action->mutable_original_request()) = request_;
RETURN_IF_ERROR(
DaemonContext::Get().persistent_storage_manager().WriteSavedActionsChange(
saved_actions));
return DaemonContext::Get().scheduler().DelayCall(
std::bind(&ActionContext::NextStateInit, this), absl::ZeroDuration(),
execution_task_name());
}
void ActionContext::NextStateInit() {
absl::MutexLock lock(mutex_);
if (precondition_.has_value()) {
SetState(ActionState::ACTION_STATE_CHECKING_PRECONDITION);
} else {
SetState(ActionState::ACTION_STATE_RUNNING_ACTION);
}
}
absl::Status ActionContext::EnterStateCheckingPrecondition() {
return StartCheckingCondition(
*precondition_,
absl::bind_front(&ActionContext::NextStatePreconditionMatched, this));
}
void ActionContext::NextStatePreconditionMatched(absl::Status status,
Condition::MatchList matches) {
absl::MutexLock lock(mutex_);
ActionStateChange change;
AssignConditions(change, matches);
if (!status.ok()) {
LOG(ERROR) << "Precondition failed: " << status;
SetStatus(*change.mutable_status(), status);
SetState(safepower_agent_proto::ACTION_STATE_ERROR, std::move(change));
return;
}
SetState(ActionState::ACTION_STATE_RUNNING_ACTION, std::move(change));
}
std::optional<absl::Status> ActionContext::CheckValidation(
const safepower_agent_proto::SystemState& state) {
auto [match_status, match_list] = validation_->CheckState(state);
if (!match_status.ok() || !match_list.empty()) {
return DaemonContext::Get().scheduler().DelayCall(
std::bind(&ActionContext::NextStateValidationCompleted, this,
match_status, std::move(match_list)),
absl::ZeroDuration(), execution_task_name());
}
return std::nullopt;
}
absl::Status ActionContext::EnterStateRunningAction() {
if (validation_.has_value()) {
auto result = CheckValidation(manager_.system_state_updater()->state());
if (result.has_value()) {
return *result;
}
result = CheckValidation(manager_.global_system_state_updater()->state());
if (result.has_value()) {
return *result;
}
}
return DaemonContext::Get().scheduler().DelayCall(
std::bind(&ActionContext::RunAction, this), absl::ZeroDuration(),
execution_task_name());
}
void ActionContext::RunAction() {
absl::MutexLock lock(mutex_);
action_impl_(request_.action(), [this](absl::Status status) {
// NextStateActionRan must be called asynchronously to avoid deadlocks. It
// needs to be run as a different task name because the task running
// RunAction may not have completed when the action callback is called.
absl::Status delay_status = DaemonContext::Get().scheduler().DelayCall(
std::bind(&ActionContext::NextStateActionRan, this, status),
absl::ZeroDuration(), response_task_name());
if (!delay_status.ok()) {
LOG(DFATAL) << "Failed to delay next state: " << delay_status;
NextStateActionRan(status);
}
});
}
void ActionContext::NextStateActionRan(absl::Status status) {
absl::MutexLock lock(mutex_);
if (!status.ok()) {
LOG(ERROR) << "Action failed: " << status;
ActionStateChange change;
SetStatus(*change.mutable_status(), status);
SetState(ActionState::ACTION_STATE_ERROR, std::move(change));
return;
}
if (validation_.has_value()) {
SetState(ActionState::ACTION_STATE_VALIDATING_FINAL_STATE);
} else {
SetState(ActionState::ACTION_STATE_SUCCESS);
}
}
absl::Status ActionContext::EnterStateValidatingFinalState() {
return StartCheckingCondition(
*validation_,
absl::bind_front(&ActionContext::NextStateValidationCompleted, this));
}
void ActionContext::NextStateValidationCompleted(absl::Status status,
Condition::MatchList matches) {
absl::MutexLock lock(mutex_);
ActionStateChange change;
AssignConditions(change, matches);
if (!status.ok()) {
LOG(ERROR) << "Validation failed: " << status;
SetStatus(*change.mutable_status(), status);
SetState(ActionState::ACTION_STATE_ERROR, std::move(change));
return;
}
SetState(ActionState::ACTION_STATE_SUCCESS, std::move(change));
}
absl::Status ActionContextManager::RegisterAction(
const safepower_agent_proto::Action& action,
ActionContext::Action action_impl) {
absl::MutexLock lock(actions_mutex_);
auto [_, inserted] = actions_.try_emplace(action, std::move(action_impl));
if (!inserted) {
return absl::AlreadyExistsError(
absl::StrCat("Action already registered: ", action));
}
return absl::OkStatus();
}
absl::StatusOr< std::unique_ptr<ActionContext>>
ActionContextManager::ReloadActionContext(
safepower_agent_persistence_proto::SavedAction saved_action) {
Action action_impl = nullptr;
if (!ActionContext::IsFinalState(
saved_action.action_state_log().current_state())) {
ASSIGN_OR_RETURN(action_impl,
ReserveAction(saved_action.original_request().action()));
}
auto action_context = std::make_unique<ActionContext>(
ActionContext::CreationToken{}, *this,
std::move(*saved_action.mutable_original_request()),
std::move(action_impl),
std::move(*saved_action.mutable_action_state_log()));
RETURN_IF_ERROR(action_context->Activate());
return action_context;
}
absl::Status ActionContextManager::LoadSavedActions() {
ASSIGN_OR_RETURN(
SavedActions saved_actions,
DaemonContext::Get().persistent_storage_manager().ReadSavedActions());
absl::MutexLock lock(actions_mutex_);
for (auto& saved_action_record : saved_actions.saved_action_records()) {
if (!saved_action_record.has_actions() ||
!saved_action_record.actions().has_original_request() ||
!saved_action_record.actions().original_request().has_flight_record()) {
LOG(DFATAL) << "Saved action record is missing flight record: "
<< saved_action_record.DebugString();
continue;
}
absl::StatusOr<std::unique_ptr<ActionContext>> action_context_or =
ReloadActionContext(saved_action_record.actions());
if (!action_context_or.ok()) {
LOG(ERROR) << "Failed to reload action: " << action_context_or.status();
continue;
}
const FlightRecordRequest& flight_record =
saved_action_record.actions().original_request().flight_record();
std::unique_ptr<ActionContext> action_context =
*std::move(action_context_or);
auto [it, inserted] =
running_actions_.try_emplace(flight_record, std::move(action_context));
if (!inserted) {
return absl::AlreadyExistsError(absl::StrCat(
"Duplicate action ID: ", action_context->flight_record()));
}
}
return absl::OkStatus();
}
absl::StatusOr<ActionContext::Action> ActionContextManager::ReserveAction(
const safepower_agent_proto::Action& action) {
auto found_action_impl = actions_.find(action);
if (found_action_impl == actions_.end()) {
return absl::NotFoundError(absl::StrCat("Action not found: ", action));
}
if (found_action_impl->second == nullptr) {
return absl::FailedPreconditionError(
absl::StrCat("Action already started: ", action));
}
return std::move(found_action_impl->second);
}
absl::Status ActionContextManager::validateFlightRecord(
const FlightRecordRequest& record) {
if (!record.has_flight_name()) {
return absl::InvalidArgumentError("Flight ID is required");
}
if (!record.has_step_id()) {
return absl::InvalidArgumentError("Step ID is required");
}
if (record.flight_name().empty()) {
return absl::InvalidArgumentError("Flight ID must not be empty");
}
if (record.step_id().empty()) {
return absl::InvalidArgumentError("Step ID must not be empty");
}
return absl::OkStatus();
}
absl::StatusOr<ActionContext* > ActionContextManager::StartAction(
safepower_agent_proto::StartActionRequest request) {
// Make sure this is never destroyed with the lock held.
std::unique_ptr<ActionContext> action_context;
RETURN_IF_ERROR(validateFlightRecord(request.flight_record()));
FlightRecordRequest flight_record = request.flight_record();
absl::MutexLock lock(actions_mutex_);
ASSIGN_OR_RETURN(ActionContext::Action action_impl,
ReserveAction(request.action()));
action_context = std::make_unique<ActionContext>(
ActionContext::CreationToken{}, *this, std::move(request),
std::move(action_impl));
RETURN_IF_ERROR(action_context->Activate());
auto [it, inserted] =
running_actions_.try_emplace(flight_record, std::move(action_context));
if (!inserted) {
return absl::AlreadyExistsError(
absl::StrCat("Duplicate flight record: ", flight_record));
}
return it->second.get();
}
void ActionContextManager::FinishAction(
const safepower_agent_proto::Action& action,
ActionContext::Action action_impl) {
absl::MutexLock lock(actions_mutex_);
auto found_action_impl = actions_.find(action);
if (found_action_impl == actions_.end()) {
LOG(DFATAL) << "Action not found: " << action;
return;
}
if (found_action_impl->second != nullptr) {
LOG(DFATAL) << "Action not started: " << action;
return;
}
found_action_impl->second = std::move(action_impl);
}
ActionContext* ActionContextManager::GetActionContext(
const FlightRecordRequest& flight_record) {
absl::Status status = validateFlightRecord(flight_record);
if (!status.ok()) {
LOG(ERROR) << "Invalid flight record: " << status;
return nullptr;
}
absl::MutexLock lock(actions_mutex_);
auto found_action_context = running_actions_.find(flight_record);
if (found_action_context == running_actions_.end()) {
LOG(ERROR) << "Flight record not found: " << flight_record;
return nullptr;
}
return found_action_context->second.get();
}
void ActionContextManager::GetSupportedActions(
safepower_agent_proto::GetSupportedActionsResponse& response) const {
absl::MutexLock lock(actions_mutex_);
for (const auto& [action, _] : actions_) {
response.add_actions()->CopyFrom(action);
}
}
} // namespace safepower_agent