blob: 3f648540048cdb6836bccbd63b3d37982a298042 [file] [log] [blame]
#include "safepower_agent.h"
#include <memory>
#include <utility>
#include "action_context.h"
#include "daemon_context.h"
#include "bmc/convert_status.h"
#include "safepower_agent.pb.h"
#include "state_change_reactor.h"
#include "absl/base/nullability.h"
#include "absl/log/log.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/time/time.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/server_callback.h"
#include "grpcpp/support/status.h"
namespace safepower_agent {
using safepower_agent_proto::ActionStateLog;
using safepower_agent_proto::SystemState;
template <typename StateProto>
StateChangeWriteReactor<StateProto>* DetachReactor(
std::unique_ptr<StateChangeWriteReactor<StateProto>> reactor) {
return StateChangeWriteReactor<StateProto>::Detach(std::move(reactor));
}
template <typename Reactor>
bool AbortWhileDisruptionExpected(Reactor& reactor) {
absl::Duration disruption =
DaemonContext::Get().disruption_manager().PendingDisruption();
if (disruption > absl::ZeroDuration()) {
LOG(WARNING) << "Failing RPC because disruption is expected in "
<< disruption;
reactor->Finish(
{grpc::StatusCode::FAILED_PRECONDITION,
absl::StrCat(
"New operations not allowed because disruption is expected in ",
absl::FormatDuration(disruption))});
return true;
}
return false;
}
grpc::ServerUnaryReactor* SafepowerLocalAgentImpl::StartAction(
grpc::CallbackServerContext* context,
const safepower_agent_proto::StartActionRequest* request,
safepower_agent_proto::StartActionResponse* response) {
grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
if (AbortWhileDisruptionExpected(reactor)) return reactor;
absl::StatusOr<ActionContext* > action_context =
action_context_manager_->StartAction(*request);
if (!action_context.ok()) {
LOG(ERROR) << "Failed to start action: " << action_context.status();
} else {
response->set_response_id(action_context.value()->action_id());
LOG(INFO) << "Started action " << action_context.value()->action_id()
<< " with request " << request;
}
reactor->Finish(ConvertStatus(action_context.status()));
return reactor;
}
grpc::ServerWriteReactor<safepower_agent_proto::ActionStateLog>*
SafepowerLocalAgentImpl::MonitorAction(
grpc::CallbackServerContext* /*context*/,
const safepower_agent_proto::MonitorActionRequest* request) {
auto reactor = std::make_unique<StateChangeWriteReactor<ActionStateLog>>();
if (AbortWhileDisruptionExpected(reactor)) {
return DetachReactor(std::move(reactor));
}
ActionContext* action_context =
action_context_manager_->GetActionContext(request->response_id());
if (action_context == nullptr) {
LOG(ERROR) << "Action " << request->response_id() << " not found";
reactor->Finish(
{grpc::StatusCode::NOT_FOUND,
absl::StrCat("Action ", request->response_id(), " not found")});
return DetachReactor(std::move(reactor));
}
reactor->Connect(action_context->action_state_updater());
return DetachReactor(std::move(reactor));
}
grpc::ServerWriteReactor<SystemState>* SafepowerLocalAgentImpl::MonitorState(
grpc::CallbackServerContext* /*context*/,
const safepower_agent_proto::MonitorStateRequest* /*request*/) {
auto reactor = std::make_unique<
StateChangeWriteReactor<safepower_agent_proto::SystemState>>();
if (AbortWhileDisruptionExpected(reactor)) {
return DetachReactor(std::move(reactor));
}
reactor->Connect(system_state_updater_);
return DetachReactor(std::move(reactor));
}
grpc::ServerUnaryReactor* SafepowerLocalAgentImpl::GetSupportedActions(
grpc::CallbackServerContext* context,
const safepower_agent_proto::GetSupportedActionsRequest* /*request*/,
safepower_agent_proto::GetSupportedActionsResponse* response) {
grpc::ServerUnaryReactor* reactor = context->DefaultReactor();
action_context_manager_->GetSupportedActions(*response);
reactor->Finish(grpc::Status::OK);
return reactor;
}
} // namespace safepower_agent