blob: 954429993e9813e9f4568737b62226c707d493fc [file] [log] [blame] [edit]
#include "sse_plugin/events_manager.h"
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/no_destructor.h"
#include "absl/base/thread_annotations.h"
#include "absl/functional/bind_front.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "utils/clock_interface.h"
#include "absl/time/time.h"
#include "redfish_query_engine/http/codes.h"
#include "nlohmann/json.hpp"
#include "nlohmann/json_fwd.hpp"
#include "metrics.h"
#include "redfish_plugin.h"
#include "request_response.h"
#include "sse/sse_parser.h"
#include "sse_plugin/event.h"
#include "vendor_events.pb.h"
#include "utils/std_thread.h"
#include "utils/status_macros.h"
#include "utils/thread.h"
#include "utils/timer.h"
#include "voyager/priority_queue.hpp"
namespace milotic {
namespace {
const int kVlogVerbosity = 1;
} // namespace
using ::ecclesia::HttpResponseCode::HTTP_CODE_REQUEST_OK;
constexpr absl::string_view kRedfishEventServicePath =
"/redfish/v1/EventService";
class RMCEventHandler : public ::milotic::RedfishPlugin::EventHandler {
public:
explicit RMCEventHandler(BaseEventsManager* events_manager,
voyager::TelemetryPriorityQueue* queue,
size_t init_timer_start_wait_sec,
size_t init_timer_restart_wait_sec,
absl::Clock& clock)
: events_manager_(events_manager),
queue_(queue),
init_wait_timer_(
absl::bind_front(&RMCEventHandler::InitTimeoutExpired, this),
"InitWaitTimer", clock),
init_timer_start_wait_(absl::Seconds(init_timer_start_wait_sec)),
init_timer_restart_wait_(absl::Seconds(init_timer_restart_wait_sec)) {}
~RMCEventHandler() override = default;
absl::Status OnResponse(const ProxyResponse& response) override;
absl::Status OnEvent(const google::protobuf::Message& event_message) override;
absl::Status OnEvent(const milotic::ServerSentEvent& event) override;
bool IsCancelled() const override;
private:
void InitTimeoutExpired();
BaseEventsManager* events_manager_;
voyager::TelemetryPriorityQueue* queue_;
Timer init_wait_timer_;
const absl::Duration init_timer_start_wait_;
const absl::Duration init_timer_restart_wait_;
};
void BaseEventsManager::StartMainThread() {
DCHECK(!main_thread_);
main_thread_ =
std::make_unique<StdThread>("MainThread", [this] { MainThreadRun(); });
absl::Status status = main_thread_->Start();
if (!status.ok()) {
LOG(ERROR) << "Failed to start main thread, status: " << status;
}
}
// MainThreadRun does the following
// - Get existing events from StorageManager and replays them
// - Start thread to stream new events from RMC and put them in EventQ
// - Process events from EventQ sequentially
void BaseEventsManager::MainThreadRun() {
VLOG(kVlogVerbosity) << "MainThreadRun";
if (absl::Status status = ReplayProcessedEvents(); !status.ok()) {
LOG(ERROR) << "Failed to replay processed events, status: " << status;
return;
}
VLOG(kVlogVerbosity) << "Replay done";
if (absl::Status status =
serving_state_timer_.Start(absl::Seconds(serving_state_timeout_sec_));
!status.ok()) {
LOG(ERROR) << "Failed to start serving state timer, status: " << status;
return;
}
// start StreamThread (gets events from from RMC and adds to Q).
stream_thread_ = std::make_unique<StdThread>("StreamThread",
[this] { PullEventsFromRMC(); });
absl::Status status = stream_thread_->Start();
if (!status.ok()) {
LOG(ERROR) << "Failed to start stream thread, status: " << status;
return;
}
LOG(INFO) << "Stream thread started";
ProcessEventQueue();
}
bool BaseEventsManager::InterruptibleSleep(absl::Duration duration) {
auto is_stopping = [this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
return plugin_state_ == PluginState::kStop;
};
absl::MutexLock lock(mutex_);
return clock_.AwaitWithDeadline(&mutex_, absl::Condition(&is_stopping),
clock_.TimeNow() + duration);
}
// Replay existing non-archived events to bootstrap machine state.
absl::Status BaseEventsManager::ReplayProcessedEvents() {
// read the existing events
LOG(INFO) << "Retrieving pocessed events";
ASSIGN_OR_RETURN(std::vector<milotic::Event> events,
storage_manager_->RetrieveProcessedEvents());
LOG(INFO) << "Retrieved " << events.size() << " events for replay";
for (milotic::Event& event : events) {
machine_state_->UpdateStatus(event, /*is_replay=*/true);
}
SetState(PluginState::kReplayDone, PluginState::kInit);
return absl::OkStatus();
}
// Creates a long running SSE connection to RMC and adds received events to
// EventQ and handles heartbeat.
// Retry on connection failures to RMC server.
void BaseEventsManager::PullEventsFromRMC() {
VLOG(kVlogVerbosity) << "PullEventsFromRMC";
// create persistent connection to RMC and wait for events and add to Q
auto rmc_handler =
RMCEventHandler(this, priority_queue_, init_timer_start_wait_sec_,
init_timer_restart_wait_sec_, clock_);
int attempt = 1;
while ((sse_connection_retry_max_attempts_ == 0 ||
attempt <= sse_connection_retry_max_attempts_) &&
!IsStopping()) {
// Can transition from Serving/ReplayDone
SetState(PluginState::kStreamStarting);
LOG(INFO) << "Getting SSE Uri, attempt: " << attempt;
absl::StatusOr<std::string> sse_uri_response;
sse_uri_response = GetRedfishSSEUri();
if (!sse_uri_response.ok()) {
LOG(ERROR) << "Failed to get SSE Uri, status: "
<< sse_uri_response.status();
if ((sse_connection_retry_max_attempts_ == 0 ||
attempt <= sse_connection_retry_max_attempts_) &&
!IsStopping()) {
InterruptibleSleep(sse_connection_retry_delay_);
}
attempt += 1;
continue;
}
// Wait for the events queue to clear before creating new connection
// This is to ensure we don't process duplicate events which have been
// already received by vBMC.
if (!priority_queue_->Empty()) {
VLOG(kVlogVerbosity) << "Event queue is not empty, waiting to clear";
voyager::Job job;
auto job_run = [](voyager::Job& job) -> absl::Status {
return absl::OkStatus();
};
CHECK_OK(priority_queue_->Enqueue(job, job_run, kEventsPriority));
job.Wait();
}
// For handling surrogate machines where SSE is not supported
if (skip_sse_connection_) {
LOG(INFO) << "ignoring SSE connection to RMC";
rmc_handler.OnResponse(ProxyResponse(HTTP_CODE_REQUEST_OK)).IgnoreError();
while (!IsStopping()) {
InterruptibleSleep(sse_connection_retry_delay_);
VLOG(kVlogVerbosity) << "Validing connection to RMC";
sse_uri_response = GetRedfishSSEUri();
if (!sse_uri_response.ok()) {
LOG(ERROR) << "Failed to get SSE Uri, status: "
<< sse_uri_response.status();
break;
}
}
continue;
}
LOG(INFO) << "Calling Subscribe, with LastEventId: "
<< machine_state_->LastEventId() << " attempt: " << attempt;
std::unique_ptr<ProxyRequest> request =
proxy_->CreateRequest(*sse_uri_response);
request->AddHeader("Accept", sse_accept_header_);
request->AddHeader("Last-Event-Id",
absl::StrCat(machine_state_->LastEventId()));
absl::Status status = proxy_->Subscribe(std::move(request), &rmc_handler);
VLOG(kVlogVerbosity) << "Subscribe ended with status: " << status;
if (!status.ok()) {
attempt += 1;
} else {
// reset attempt count on subscribe connection end with OK status.
attempt = 1;
}
// TODO: b/297219058 - Use retry delay provided by RMC if available.
if ((sse_connection_retry_max_attempts_ == 0 ||
attempt <= sse_connection_retry_max_attempts_) &&
!IsStopping()) {
InterruptibleSleep(sse_connection_retry_delay_);
}
}
VLOG(kVlogVerbosity) << "PullEventsFromRMC exiting";
}
// ProcessEventQueue waits on the EventQ for new events and
// processes them sequentially.
void BaseEventsManager::ProcessEventQueue() {
VLOG(kVlogVerbosity) << "ProcessEventQueue";
while (!IsStopping()) {
VLOG(kVlogVerbosity) << "waiting for event in ProcessQueue";
if (absl::Status status = priority_queue_->ProcessQueue(1); !status.ok()) {
LOG(ERROR) << "Failed to process queue, status: " << status;
return;
}
{
absl::MutexLock lock(mutex_);
processed_events_count_++;
}
}
}
void BaseEventsManager::Shutdown() {
VLOG(kVlogVerbosity) << "Shutdown";
serving_state_timer_.End();
SetState(PluginState::kStop);
priority_queue_->InterruptProcessing();
if (stream_thread_ != nullptr) {
stream_thread_->Join();
stream_thread_ = nullptr;
}
}
void BaseEventsManager::Reset() {
LOG(INFO) << "Resetting BaseEventsManager";
CHECK_OK(reset_timer_.Start(absl::Seconds(reset_timeout_sec_)));
VLOG(kVlogVerbosity) << "Setting plugin state to kStop";
SetState(PluginState::kStop);
LOG(INFO) << "Joining stream thread";
if (stream_thread_ != nullptr) {
stream_thread_->Join();
stream_thread_ = nullptr;
VLOG(kVlogVerbosity) << "Stream thread joined";
}
VLOG(kVlogVerbosity) << "Resetting priority queue";
priority_queue_->InterruptProcessing();
priority_queue_->Reset();
LOG(INFO) << "Joining main thread";
if (main_thread_ != nullptr) {
main_thread_->Join();
main_thread_ = nullptr;
VLOG(kVlogVerbosity) << "Main thread joined";
}
VLOG(kVlogVerbosity) << "Resetting serving state timer";
serving_state_timer_.Reset();
VLOG(kVlogVerbosity) << "Resetting machine state";
machine_state_->Reset();
fetch_logs_on_init_expiry_ = false;
reset_timer_.Reset();
VLOG(kVlogVerbosity) << "Setting plugin state to kInit";
SetState(PluginState::kInit, PluginState::kStop);
{
absl::MutexLock lock(mutex_);
processed_events_count_ = 0;
}
LOG(INFO) << "Starting main thread";
StartMainThread();
}
void BaseEventsManager::ResetTimeout() {
LOG(ERROR)
<< "Timeout reached while waiting for BaseEventsManager Reset, force "
"killing vBMC";
CommonMetrics::Get().events_manager_reset_timeout.Increment({});
exit(1);
}
BaseEventsManager::~BaseEventsManager() {
Shutdown();
if (main_thread_ != nullptr) {
main_thread_->Join();
main_thread_ = nullptr;
}
}
void BaseEventsManager::FetchAndSaveLogs() {
absl::StatusOr<std::string> log_contents = GetRedfishLogs();
if (!log_contents.ok()) {
// TODO: b/293333696 - handle failures
LOG(ERROR) << "Failed to get redfish logs: " << log_contents.status();
return;
}
absl::Status status = storage_manager_->SaveLogs(
machine_state_->LastHealthChangeEvent(), *log_contents);
if (!status.ok()) {
if (absl::IsCancelled(status)) {
LOG(ERROR) << "Got cancelled status from storage manager, disconnecting";
storage_manager_->Disconnect();
return;
}
LOG(ERROR) << "SaveLogs failed: " << status;
}
}
absl::StatusOr<std::string> BaseEventsManager::GetRedfishbyURI(
absl::string_view redfish_uri, const nlohmann::json::json_pointer& field) {
// TODO: b/294860123 - Add metrics for each uri using hash table with counters
VLOG(kVlogVerbosity) << "GetRedfishbyURI: " << redfish_uri
<< " field: " << field.to_string();
std::unique_ptr<ProxyRequest> request = proxy_->CreateRequest(redfish_uri);
ProxyResponse response;
ASSIGN_OR_RETURN(response, proxy_->DispatchRequest(
milotic::RedfishPlugin::RequestVerb::kGet,
std::move(request)));
if (response.GetCode() != HTTP_CODE_REQUEST_OK) {
return absl::InternalError(absl::StrCat(
"GET failed from path `", redfish_uri, "`, response code: ",
response.GetCode(), " ,response body: ", response.GetBody()));
}
if (response.GetBody().empty())
return absl::InternalError("Response body is empty");
if (field.to_string().empty()) {
return std::string(response.GetBody());
}
nlohmann::json parsed = response.GetBodyJson();
if (parsed.is_discarded())
return absl::InternalError("Failed to parse response body");
auto* value = parsed[field].get_ptr<std::string*>();
if (value == nullptr) {
return absl::InternalError(
absl::StrCat("Field ", field.to_string(), " is null/not string"));
}
return *value;
}
absl::StatusOr<std::string> BaseEventsManager::GetRedfishLogsUri() {
return GetRedfishbyURI(redfish_logs_path_, redfish_logs_fields_);
}
// Getting Redfish logs is a 2 step process, first we get the URI where logs
// are present and then we can get logs from the URI in response.
absl::StatusOr<std::string> BaseEventsManager::GetRedfishLogs() {
ASSIGN_OR_RETURN(std::string logs_uri, GetRedfishLogsUri());
return GetRedfishbyURI(logs_uri, ""_json_pointer);
}
absl::StatusOr<std::string> BaseEventsManager::GetRedfishSSEUri() {
nlohmann::json::json_pointer path = "/ServerSentEventUri"_json_pointer;
return GetRedfishbyURI(kRedfishEventServicePath, path);
}
void BaseEventsManager::ServingStateTimeout() {
if (!IsServing()) {
machine_state_->UpdateInternalEvent("Reached serving state timeout",
milotic::EventSeverity::kWarning);
}
}
absl::string_view BaseEventsManager::ExplainPluginState() {
absl::MutexLock lock(mutex_);
switch (plugin_state_) {
case PluginState::kInit:
return "Init: Base state, Waiting for connection from eiger collector";
case PluginState::kReplayDone:
return "ReplayDone: Connected to eiger collector, will start SSE to RMC";
case PluginState::kStreamStarting:
return "StreamStarting: Trying to create SSE connection to RMC";
case PluginState::kStreamStarted:
return "StreamStarted: SSE connection to RMC created, waiting for events "
"from RMC";
case PluginState::kServing:
return "Serving: vBMC is serving";
case PluginState::kStop:
return "Stop: vBMC is stopping";
default:
return "Unknown state";
}
}
bool BaseEventsManager::TESTONLY_AwaitState(PluginState state,
absl::Duration timeout) {
auto check_state = [this, state]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
return plugin_state_ == state;
};
absl::MutexLock lock(mutex_);
return mutex_.AwaitWithTimeout(absl::Condition(&check_state), timeout);
}
bool BaseEventsManager::TESTONLY_AwaitHealth(MachineHealth health,
absl::Duration timeout) const {
LOG(INFO) << "TESTONLY_AwaitHealth: " << MachineHealthToString(health)
<< " timeout: " << timeout;
auto start_time = absl::Now();
while (absl::Now() - start_time < timeout) {
if (machine_state_->health() == health) {
LOG(INFO) << "Reached health: " << MachineHealthToString(health)
<< " after " << absl::Now() - start_time;
return true;
}
absl::SleepFor(absl::Milliseconds(50));
}
LOG(ERROR) << "Timeout reached while waiting for health: "
<< MachineHealthToString(health) << " after " << timeout;
return false;
}
bool BaseEventsManager::TESTONLY_AwaitEventCount(int count,
absl::Duration timeout) {
VLOG(kVlogVerbosity) << "AwaitEventCount: " << count;
auto check_event_count = [this, count]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
return processed_events_count_ == count;
};
absl::MutexLock lock(mutex_);
return mutex_.AwaitWithTimeout(absl::Condition(&check_event_count), timeout);
}
void BaseEventsManager::TESTONLY_SetRmcConnectionRetryConfig(
int max_attempts, absl::Duration retry_delay) {
sse_connection_retry_max_attempts_ = max_attempts;
sse_connection_retry_delay_ = retry_delay;
}
void BaseEventsManager::TESTONLY_SetServingStateTimeout(int64_t timeout_sec) {
serving_state_timeout_sec_ = timeout_sec;
}
// Return the timestamp and events details when health changed to !OK last in
// json format.
nlohmann::json BaseMachineState::LastHealthChangeDetails() const {
nlohmann::json health_change;
health_change["Timestamp"] = "";
health_change["EventId"] = "";
health_change["ReasonId"] = "";
health_change["EventSeverity"] = "";
health_change["EventDetails"] = {};
// If health is Ok or we don't have health transition event,
// return empty values.
if (health_ == MachineHealth::kOk ||
event_last_health_transition_.IsEmpty()) {
return health_change;
}
// Otherwise validate and populate each value.
if (timestamp_last_health_transition_ != absl::UnixEpoch()) {
health_change["Timestamp"] = absl::FormatTime(
timestamp_last_health_transition_, absl::UTCTimeZone());
}
if (event_last_health_transition_.event_id() != 0) {
health_change["EventId"] =
absl::StrCat(event_last_health_transition_.event_id());
}
if (event_last_health_transition_.reason_id() != 0) {
health_change["ReasonId"] =
absl::StrCat(event_last_health_transition_.reason_id());
}
health_change["EventSeverity"] =
EventSeverityToString(event_last_health_transition_.severity());
nlohmann::json parsed = nlohmann::json::parse(
event_last_health_transition_.event_json(), /*cb=*/nullptr,
/*allow_exceptions=*/false);
if (!parsed.is_discarded()) {
health_change["EventDetails"] = std::move(parsed);
} else {
LOG(ERROR) << "Failed to parse event json: "
<< event_last_health_transition_.event_json();
}
return health_change;
}
absl::Status RMCEventHandler::OnResponse(const ProxyResponse& response) {
VLOG(kVlogVerbosity) << "OnResponse body: " << response.GetCode();
if (response.GetCode() != HTTP_CODE_REQUEST_OK) {
return absl::UnavailableError(
absl::StrCat("SSE Response code:", response.GetCode(),
" body: ", response.GetBody()));
}
if (!init_wait_timer_.IsRunning()) {
VLOG(kVlogVerbosity) << "Starting init timer";
RETURN_IF_ERROR(init_wait_timer_.Start(init_timer_start_wait_));
events_manager_->SetState(PluginState::kStreamStarted,
PluginState::kStreamStarting);
} else {
events_manager_->SetState(PluginState::kServing,
PluginState::kStreamStarting);
}
return absl::OkStatus();
}
absl::Status RMCEventHandler::OnEvent(const google::protobuf::Message& event_message) {
return absl::UnimplementedError("Not implemented");
}
absl::Status RMCEventHandler::OnEvent(const milotic::ServerSentEvent& event) {
if (event.id.has_value() && !event.data.has_value()) {
LOG(ERROR) << "Received event has Id but no data" << event.id.value();
return absl::OkStatus();
}
// Ignore heartbeat calls.
if (!event.data.has_value()) return absl::OkStatus();
if (!events_manager_->IsServing() && !events_manager_->IsStopping()) {
absl::Status status = init_wait_timer_.Restart(init_timer_restart_wait_);
if (!status.ok()) {
// We only log and ignore the error here since restart can fail because
// of timer expiry, and we don't want to miss the incoming event.
VLOG(kVlogVerbosity) << "init_wait_timer_.Restart failed: " << status;
}
}
std::string sse_id;
if (event.id.has_value()) {
VLOG(kVlogVerbosity) << "Received event with sse id: " << event.id.value();
sse_id = event.id.value();
}
// create EventJob from event.
absl::StatusOr<std::vector<milotic::Event>> rf_events =
events_manager_->ParseSSEvent(event.data.value(), sse_id);
if (!rf_events.ok()) {
LOG(ERROR) << "Failed to parse redfish event: " << rf_events.status()
<< "event: " << event.data.value();
return absl::OkStatus();
}
for (const auto& rf_event : *rf_events) {
auto job = std::make_unique<EventJob>(rf_event);
absl::Status status = queue_->Enqueue(
std::move(job),
absl::bind_front(&BaseEventsManager::ProcessEvent, events_manager_),
kEventsPriority);
if (!status.ok()) {
LOG(ERROR) << "Failed to enqueue event: " << rf_event.event_id()
<< " status: " << status;
}
}
return absl::OkStatus();
}
bool RMCEventHandler::IsCancelled() const {
return events_manager_->IsStopping();
}
// Wait (using timer) is added to ensure the RMC sends all the pending events
// during the time connection was down (vbmc restarting/machine under repair
// etc). This is the contract with the vendor that all the pending events should
// start within 30 sec of starting the SSE connection and there would not be
// more than 10 sec interval b/w each event.
// Edge cases of a single event landing in Init state vs Serving is not relevant
void RMCEventHandler::InitTimeoutExpired() {
static absl::NoDestructor<voyager::Job> job;
absl::Status status = queue_->Enqueue(
*job,
absl::bind_front(&BaseEventsManager::ProcessInitExpiry, events_manager_),
kEventsPriority);
VLOG(kVlogVerbosity) << "InitTimeoutExpired exit: " << status;
}
MachineHealth GetMachineHealth(EventSeverity severity) {
switch (severity) {
case EventSeverity::kWarning:
return MachineHealth::kWarning;
case EventSeverity::kEmergent:
return MachineHealth::kEmergent;
case EventSeverity::kCritical:
return MachineHealth::kCritical;
default:
return MachineHealth::kOk;
}
}
std::string MachineHealthToString(MachineHealth health) {
switch (health) {
case MachineHealth::kOk:
return "Ok";
case MachineHealth::kWarning:
return "Warning";
case MachineHealth::kEmergent:
return "Emergent";
case MachineHealth::kCritical:
return "Critical";
default:
return "Unknown";
}
}
absl::string_view PluginStateToString(PluginState state) {
switch (state) {
case PluginState::kInit:
return "Init";
case PluginState::kReplayDone:
return "ReplayDone";
case PluginState::kStreamStarting:
return "StreamStarting";
case PluginState::kStreamStarted:
return "StreamStarted";
case PluginState::kServing:
return "Serving";
case PluginState::kStop:
return "Stop";
default:
return "Unknown";
}
}
} // namespace milotic