| #include "sse_plugin/events_manager.h" |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/base/no_destructor.h" |
| #include "absl/base/thread_annotations.h" |
| #include "absl/functional/bind_front.h" |
| #include "absl/log/check.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/clock.h" |
| #include "utils/clock_interface.h" |
| #include "absl/time/time.h" |
| #include "redfish_query_engine/http/codes.h" |
| #include "nlohmann/json.hpp" |
| #include "nlohmann/json_fwd.hpp" |
| #include "metrics.h" |
| #include "redfish_plugin.h" |
| #include "request_response.h" |
| #include "sse/sse_parser.h" |
| #include "sse_plugin/event.h" |
| #include "vendor_events.pb.h" |
| #include "utils/std_thread.h" |
| #include "utils/status_macros.h" |
| #include "utils/thread.h" |
| #include "utils/timer.h" |
| #include "voyager/priority_queue.hpp" |
| |
| namespace milotic { |
| |
| namespace { |
| const int kVlogVerbosity = 1; |
| } // namespace |
| |
| using ::ecclesia::HttpResponseCode::HTTP_CODE_REQUEST_OK; |
| |
| constexpr absl::string_view kRedfishEventServicePath = |
| "/redfish/v1/EventService"; |
| |
| class RMCEventHandler : public ::milotic::RedfishPlugin::EventHandler { |
| public: |
| explicit RMCEventHandler(BaseEventsManager* events_manager, |
| voyager::TelemetryPriorityQueue* queue, |
| size_t init_timer_start_wait_sec, |
| size_t init_timer_restart_wait_sec, |
| absl::Clock& clock) |
| : events_manager_(events_manager), |
| queue_(queue), |
| init_wait_timer_( |
| absl::bind_front(&RMCEventHandler::InitTimeoutExpired, this), |
| "InitWaitTimer", clock), |
| init_timer_start_wait_(absl::Seconds(init_timer_start_wait_sec)), |
| init_timer_restart_wait_(absl::Seconds(init_timer_restart_wait_sec)) {} |
| |
| ~RMCEventHandler() override = default; |
| absl::Status OnResponse(const ProxyResponse& response) override; |
| absl::Status OnEvent(const google::protobuf::Message& event_message) override; |
| absl::Status OnEvent(const milotic::ServerSentEvent& event) override; |
| bool IsCancelled() const override; |
| |
| private: |
| void InitTimeoutExpired(); |
| |
| BaseEventsManager* events_manager_; |
| voyager::TelemetryPriorityQueue* queue_; |
| Timer init_wait_timer_; |
| const absl::Duration init_timer_start_wait_; |
| const absl::Duration init_timer_restart_wait_; |
| }; |
| |
| void BaseEventsManager::StartMainThread() { |
| DCHECK(!main_thread_); |
| main_thread_ = |
| std::make_unique<StdThread>("MainThread", [this] { MainThreadRun(); }); |
| absl::Status status = main_thread_->Start(); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to start main thread, status: " << status; |
| } |
| } |
| |
| // MainThreadRun does the following |
| // - Get existing events from StorageManager and replays them |
| // - Start thread to stream new events from RMC and put them in EventQ |
| // - Process events from EventQ sequentially |
| void BaseEventsManager::MainThreadRun() { |
| VLOG(kVlogVerbosity) << "MainThreadRun"; |
| |
| if (absl::Status status = ReplayProcessedEvents(); !status.ok()) { |
| LOG(ERROR) << "Failed to replay processed events, status: " << status; |
| return; |
| } |
| VLOG(kVlogVerbosity) << "Replay done"; |
| |
| if (absl::Status status = |
| serving_state_timer_.Start(absl::Seconds(serving_state_timeout_sec_)); |
| !status.ok()) { |
| LOG(ERROR) << "Failed to start serving state timer, status: " << status; |
| return; |
| } |
| |
| // start StreamThread (gets events from from RMC and adds to Q). |
| stream_thread_ = std::make_unique<StdThread>("StreamThread", |
| [this] { PullEventsFromRMC(); }); |
| absl::Status status = stream_thread_->Start(); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to start stream thread, status: " << status; |
| return; |
| } |
| |
| LOG(INFO) << "Stream thread started"; |
| |
| ProcessEventQueue(); |
| } |
| |
| bool BaseEventsManager::InterruptibleSleep(absl::Duration duration) { |
| auto is_stopping = [this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| return plugin_state_ == PluginState::kStop; |
| }; |
| absl::MutexLock lock(mutex_); |
| return clock_.AwaitWithDeadline(&mutex_, absl::Condition(&is_stopping), |
| clock_.TimeNow() + duration); |
| } |
| |
| // Replay existing non-archived events to bootstrap machine state. |
| absl::Status BaseEventsManager::ReplayProcessedEvents() { |
| // read the existing events |
| LOG(INFO) << "Retrieving pocessed events"; |
| ASSIGN_OR_RETURN(std::vector<milotic::Event> events, |
| storage_manager_->RetrieveProcessedEvents()); |
| LOG(INFO) << "Retrieved " << events.size() << " events for replay"; |
| |
| for (milotic::Event& event : events) { |
| machine_state_->UpdateStatus(event, /*is_replay=*/true); |
| } |
| SetState(PluginState::kReplayDone, PluginState::kInit); |
| return absl::OkStatus(); |
| } |
| |
| // Creates a long running SSE connection to RMC and adds received events to |
| // EventQ and handles heartbeat. |
| // Retry on connection failures to RMC server. |
| void BaseEventsManager::PullEventsFromRMC() { |
| VLOG(kVlogVerbosity) << "PullEventsFromRMC"; |
| // create persistent connection to RMC and wait for events and add to Q |
| |
| auto rmc_handler = |
| RMCEventHandler(this, priority_queue_, init_timer_start_wait_sec_, |
| init_timer_restart_wait_sec_, clock_); |
| |
| int attempt = 1; |
| while ((sse_connection_retry_max_attempts_ == 0 || |
| attempt <= sse_connection_retry_max_attempts_) && |
| !IsStopping()) { |
| // Can transition from Serving/ReplayDone |
| SetState(PluginState::kStreamStarting); |
| LOG(INFO) << "Getting SSE Uri, attempt: " << attempt; |
| absl::StatusOr<std::string> sse_uri_response; |
| sse_uri_response = GetRedfishSSEUri(); |
| if (!sse_uri_response.ok()) { |
| LOG(ERROR) << "Failed to get SSE Uri, status: " |
| << sse_uri_response.status(); |
| if ((sse_connection_retry_max_attempts_ == 0 || |
| attempt <= sse_connection_retry_max_attempts_) && |
| !IsStopping()) { |
| InterruptibleSleep(sse_connection_retry_delay_); |
| } |
| |
| attempt += 1; |
| continue; |
| } |
| |
| // Wait for the events queue to clear before creating new connection |
| // This is to ensure we don't process duplicate events which have been |
| // already received by vBMC. |
| if (!priority_queue_->Empty()) { |
| VLOG(kVlogVerbosity) << "Event queue is not empty, waiting to clear"; |
| voyager::Job job; |
| auto job_run = [](voyager::Job& job) -> absl::Status { |
| return absl::OkStatus(); |
| }; |
| CHECK_OK(priority_queue_->Enqueue(job, job_run, kEventsPriority)); |
| job.Wait(); |
| } |
| |
| // For handling surrogate machines where SSE is not supported |
| if (skip_sse_connection_) { |
| LOG(INFO) << "ignoring SSE connection to RMC"; |
| rmc_handler.OnResponse(ProxyResponse(HTTP_CODE_REQUEST_OK)).IgnoreError(); |
| while (!IsStopping()) { |
| InterruptibleSleep(sse_connection_retry_delay_); |
| VLOG(kVlogVerbosity) << "Validing connection to RMC"; |
| sse_uri_response = GetRedfishSSEUri(); |
| if (!sse_uri_response.ok()) { |
| LOG(ERROR) << "Failed to get SSE Uri, status: " |
| << sse_uri_response.status(); |
| break; |
| } |
| } |
| continue; |
| } |
| |
| LOG(INFO) << "Calling Subscribe, with LastEventId: " |
| << machine_state_->LastEventId() << " attempt: " << attempt; |
| std::unique_ptr<ProxyRequest> request = |
| proxy_->CreateRequest(*sse_uri_response); |
| request->AddHeader("Accept", sse_accept_header_); |
| request->AddHeader("Last-Event-Id", |
| absl::StrCat(machine_state_->LastEventId())); |
| |
| absl::Status status = proxy_->Subscribe(std::move(request), &rmc_handler); |
| VLOG(kVlogVerbosity) << "Subscribe ended with status: " << status; |
| if (!status.ok()) { |
| attempt += 1; |
| } else { |
| // reset attempt count on subscribe connection end with OK status. |
| attempt = 1; |
| } |
| // TODO: b/297219058 - Use retry delay provided by RMC if available. |
| |
| if ((sse_connection_retry_max_attempts_ == 0 || |
| attempt <= sse_connection_retry_max_attempts_) && |
| !IsStopping()) { |
| InterruptibleSleep(sse_connection_retry_delay_); |
| } |
| } |
| VLOG(kVlogVerbosity) << "PullEventsFromRMC exiting"; |
| } |
| |
| // ProcessEventQueue waits on the EventQ for new events and |
| // processes them sequentially. |
| void BaseEventsManager::ProcessEventQueue() { |
| VLOG(kVlogVerbosity) << "ProcessEventQueue"; |
| while (!IsStopping()) { |
| VLOG(kVlogVerbosity) << "waiting for event in ProcessQueue"; |
| if (absl::Status status = priority_queue_->ProcessQueue(1); !status.ok()) { |
| LOG(ERROR) << "Failed to process queue, status: " << status; |
| return; |
| } |
| { |
| absl::MutexLock lock(mutex_); |
| processed_events_count_++; |
| } |
| } |
| } |
| |
| void BaseEventsManager::Shutdown() { |
| VLOG(kVlogVerbosity) << "Shutdown"; |
| serving_state_timer_.End(); |
| SetState(PluginState::kStop); |
| priority_queue_->InterruptProcessing(); |
| if (stream_thread_ != nullptr) { |
| stream_thread_->Join(); |
| stream_thread_ = nullptr; |
| } |
| } |
| |
| void BaseEventsManager::Reset() { |
| LOG(INFO) << "Resetting BaseEventsManager"; |
| |
| CHECK_OK(reset_timer_.Start(absl::Seconds(reset_timeout_sec_))); |
| |
| VLOG(kVlogVerbosity) << "Setting plugin state to kStop"; |
| SetState(PluginState::kStop); |
| |
| LOG(INFO) << "Joining stream thread"; |
| if (stream_thread_ != nullptr) { |
| stream_thread_->Join(); |
| stream_thread_ = nullptr; |
| VLOG(kVlogVerbosity) << "Stream thread joined"; |
| } |
| |
| VLOG(kVlogVerbosity) << "Resetting priority queue"; |
| priority_queue_->InterruptProcessing(); |
| priority_queue_->Reset(); |
| |
| LOG(INFO) << "Joining main thread"; |
| if (main_thread_ != nullptr) { |
| main_thread_->Join(); |
| main_thread_ = nullptr; |
| VLOG(kVlogVerbosity) << "Main thread joined"; |
| } |
| VLOG(kVlogVerbosity) << "Resetting serving state timer"; |
| serving_state_timer_.Reset(); |
| VLOG(kVlogVerbosity) << "Resetting machine state"; |
| machine_state_->Reset(); |
| fetch_logs_on_init_expiry_ = false; |
| |
| reset_timer_.Reset(); |
| |
| VLOG(kVlogVerbosity) << "Setting plugin state to kInit"; |
| SetState(PluginState::kInit, PluginState::kStop); |
| { |
| absl::MutexLock lock(mutex_); |
| processed_events_count_ = 0; |
| } |
| LOG(INFO) << "Starting main thread"; |
| StartMainThread(); |
| } |
| |
| void BaseEventsManager::ResetTimeout() { |
| LOG(ERROR) |
| << "Timeout reached while waiting for BaseEventsManager Reset, force " |
| "killing vBMC"; |
| CommonMetrics::Get().events_manager_reset_timeout.Increment({}); |
| exit(1); |
| } |
| |
| BaseEventsManager::~BaseEventsManager() { |
| Shutdown(); |
| if (main_thread_ != nullptr) { |
| main_thread_->Join(); |
| main_thread_ = nullptr; |
| } |
| } |
| |
| void BaseEventsManager::FetchAndSaveLogs() { |
| absl::StatusOr<std::string> log_contents = GetRedfishLogs(); |
| if (!log_contents.ok()) { |
| // TODO: b/293333696 - handle failures |
| LOG(ERROR) << "Failed to get redfish logs: " << log_contents.status(); |
| return; |
| } |
| absl::Status status = storage_manager_->SaveLogs( |
| machine_state_->LastHealthChangeEvent(), *log_contents); |
| if (!status.ok()) { |
| if (absl::IsCancelled(status)) { |
| LOG(ERROR) << "Got cancelled status from storage manager, disconnecting"; |
| storage_manager_->Disconnect(); |
| return; |
| } |
| LOG(ERROR) << "SaveLogs failed: " << status; |
| } |
| } |
| |
| absl::StatusOr<std::string> BaseEventsManager::GetRedfishbyURI( |
| absl::string_view redfish_uri, const nlohmann::json::json_pointer& field) { |
| // TODO: b/294860123 - Add metrics for each uri using hash table with counters |
| VLOG(kVlogVerbosity) << "GetRedfishbyURI: " << redfish_uri |
| << " field: " << field.to_string(); |
| |
| std::unique_ptr<ProxyRequest> request = proxy_->CreateRequest(redfish_uri); |
| |
| ProxyResponse response; |
| ASSIGN_OR_RETURN(response, proxy_->DispatchRequest( |
| milotic::RedfishPlugin::RequestVerb::kGet, |
| std::move(request))); |
| |
| if (response.GetCode() != HTTP_CODE_REQUEST_OK) { |
| return absl::InternalError(absl::StrCat( |
| "GET failed from path `", redfish_uri, "`, response code: ", |
| response.GetCode(), " ,response body: ", response.GetBody())); |
| } |
| |
| if (response.GetBody().empty()) |
| return absl::InternalError("Response body is empty"); |
| |
| if (field.to_string().empty()) { |
| return std::string(response.GetBody()); |
| } |
| |
| nlohmann::json parsed = response.GetBodyJson(); |
| if (parsed.is_discarded()) |
| return absl::InternalError("Failed to parse response body"); |
| |
| auto* value = parsed[field].get_ptr<std::string*>(); |
| if (value == nullptr) { |
| return absl::InternalError( |
| absl::StrCat("Field ", field.to_string(), " is null/not string")); |
| } |
| return *value; |
| } |
| |
| absl::StatusOr<std::string> BaseEventsManager::GetRedfishLogsUri() { |
| return GetRedfishbyURI(redfish_logs_path_, redfish_logs_fields_); |
| } |
| |
| // Getting Redfish logs is a 2 step process, first we get the URI where logs |
| // are present and then we can get logs from the URI in response. |
| absl::StatusOr<std::string> BaseEventsManager::GetRedfishLogs() { |
| ASSIGN_OR_RETURN(std::string logs_uri, GetRedfishLogsUri()); |
| return GetRedfishbyURI(logs_uri, ""_json_pointer); |
| } |
| |
| absl::StatusOr<std::string> BaseEventsManager::GetRedfishSSEUri() { |
| nlohmann::json::json_pointer path = "/ServerSentEventUri"_json_pointer; |
| return GetRedfishbyURI(kRedfishEventServicePath, path); |
| } |
| |
| void BaseEventsManager::ServingStateTimeout() { |
| if (!IsServing()) { |
| machine_state_->UpdateInternalEvent("Reached serving state timeout", |
| milotic::EventSeverity::kWarning); |
| } |
| } |
| |
| absl::string_view BaseEventsManager::ExplainPluginState() { |
| absl::MutexLock lock(mutex_); |
| switch (plugin_state_) { |
| case PluginState::kInit: |
| return "Init: Base state, Waiting for connection from eiger collector"; |
| case PluginState::kReplayDone: |
| return "ReplayDone: Connected to eiger collector, will start SSE to RMC"; |
| case PluginState::kStreamStarting: |
| return "StreamStarting: Trying to create SSE connection to RMC"; |
| case PluginState::kStreamStarted: |
| return "StreamStarted: SSE connection to RMC created, waiting for events " |
| "from RMC"; |
| case PluginState::kServing: |
| return "Serving: vBMC is serving"; |
| case PluginState::kStop: |
| return "Stop: vBMC is stopping"; |
| default: |
| return "Unknown state"; |
| } |
| } |
| |
| bool BaseEventsManager::TESTONLY_AwaitState(PluginState state, |
| absl::Duration timeout) { |
| auto check_state = [this, state]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| return plugin_state_ == state; |
| }; |
| absl::MutexLock lock(mutex_); |
| return mutex_.AwaitWithTimeout(absl::Condition(&check_state), timeout); |
| } |
| |
| bool BaseEventsManager::TESTONLY_AwaitHealth(MachineHealth health, |
| absl::Duration timeout) const { |
| LOG(INFO) << "TESTONLY_AwaitHealth: " << MachineHealthToString(health) |
| << " timeout: " << timeout; |
| auto start_time = absl::Now(); |
| while (absl::Now() - start_time < timeout) { |
| if (machine_state_->health() == health) { |
| LOG(INFO) << "Reached health: " << MachineHealthToString(health) |
| << " after " << absl::Now() - start_time; |
| return true; |
| } |
| absl::SleepFor(absl::Milliseconds(50)); |
| } |
| LOG(ERROR) << "Timeout reached while waiting for health: " |
| << MachineHealthToString(health) << " after " << timeout; |
| return false; |
| } |
| |
| bool BaseEventsManager::TESTONLY_AwaitEventCount(int count, |
| absl::Duration timeout) { |
| VLOG(kVlogVerbosity) << "AwaitEventCount: " << count; |
| auto check_event_count = [this, count]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| return processed_events_count_ == count; |
| }; |
| absl::MutexLock lock(mutex_); |
| return mutex_.AwaitWithTimeout(absl::Condition(&check_event_count), timeout); |
| } |
| |
| void BaseEventsManager::TESTONLY_SetRmcConnectionRetryConfig( |
| int max_attempts, absl::Duration retry_delay) { |
| sse_connection_retry_max_attempts_ = max_attempts; |
| sse_connection_retry_delay_ = retry_delay; |
| } |
| |
| void BaseEventsManager::TESTONLY_SetServingStateTimeout(int64_t timeout_sec) { |
| serving_state_timeout_sec_ = timeout_sec; |
| } |
| |
| // Return the timestamp and events details when health changed to !OK last in |
| // json format. |
| nlohmann::json BaseMachineState::LastHealthChangeDetails() const { |
| nlohmann::json health_change; |
| health_change["Timestamp"] = ""; |
| health_change["EventId"] = ""; |
| health_change["ReasonId"] = ""; |
| health_change["EventSeverity"] = ""; |
| health_change["EventDetails"] = {}; |
| // If health is Ok or we don't have health transition event, |
| // return empty values. |
| if (health_ == MachineHealth::kOk || |
| event_last_health_transition_.IsEmpty()) { |
| return health_change; |
| } |
| |
| // Otherwise validate and populate each value. |
| if (timestamp_last_health_transition_ != absl::UnixEpoch()) { |
| health_change["Timestamp"] = absl::FormatTime( |
| timestamp_last_health_transition_, absl::UTCTimeZone()); |
| } |
| if (event_last_health_transition_.event_id() != 0) { |
| health_change["EventId"] = |
| absl::StrCat(event_last_health_transition_.event_id()); |
| } |
| if (event_last_health_transition_.reason_id() != 0) { |
| health_change["ReasonId"] = |
| absl::StrCat(event_last_health_transition_.reason_id()); |
| } |
| health_change["EventSeverity"] = |
| EventSeverityToString(event_last_health_transition_.severity()); |
| |
| nlohmann::json parsed = nlohmann::json::parse( |
| event_last_health_transition_.event_json(), /*cb=*/nullptr, |
| /*allow_exceptions=*/false); |
| if (!parsed.is_discarded()) { |
| health_change["EventDetails"] = std::move(parsed); |
| } else { |
| LOG(ERROR) << "Failed to parse event json: " |
| << event_last_health_transition_.event_json(); |
| } |
| return health_change; |
| } |
| |
| absl::Status RMCEventHandler::OnResponse(const ProxyResponse& response) { |
| VLOG(kVlogVerbosity) << "OnResponse body: " << response.GetCode(); |
| if (response.GetCode() != HTTP_CODE_REQUEST_OK) { |
| return absl::UnavailableError( |
| absl::StrCat("SSE Response code:", response.GetCode(), |
| " body: ", response.GetBody())); |
| } |
| if (!init_wait_timer_.IsRunning()) { |
| VLOG(kVlogVerbosity) << "Starting init timer"; |
| RETURN_IF_ERROR(init_wait_timer_.Start(init_timer_start_wait_)); |
| events_manager_->SetState(PluginState::kStreamStarted, |
| PluginState::kStreamStarting); |
| } else { |
| events_manager_->SetState(PluginState::kServing, |
| PluginState::kStreamStarting); |
| } |
| return absl::OkStatus(); |
| } |
| absl::Status RMCEventHandler::OnEvent(const google::protobuf::Message& event_message) { |
| return absl::UnimplementedError("Not implemented"); |
| } |
| |
| absl::Status RMCEventHandler::OnEvent(const milotic::ServerSentEvent& event) { |
| if (event.id.has_value() && !event.data.has_value()) { |
| LOG(ERROR) << "Received event has Id but no data" << event.id.value(); |
| return absl::OkStatus(); |
| } |
| |
| // Ignore heartbeat calls. |
| if (!event.data.has_value()) return absl::OkStatus(); |
| |
| if (!events_manager_->IsServing() && !events_manager_->IsStopping()) { |
| absl::Status status = init_wait_timer_.Restart(init_timer_restart_wait_); |
| if (!status.ok()) { |
| // We only log and ignore the error here since restart can fail because |
| // of timer expiry, and we don't want to miss the incoming event. |
| VLOG(kVlogVerbosity) << "init_wait_timer_.Restart failed: " << status; |
| } |
| } |
| |
| std::string sse_id; |
| if (event.id.has_value()) { |
| VLOG(kVlogVerbosity) << "Received event with sse id: " << event.id.value(); |
| sse_id = event.id.value(); |
| } |
| |
| // create EventJob from event. |
| absl::StatusOr<std::vector<milotic::Event>> rf_events = |
| events_manager_->ParseSSEvent(event.data.value(), sse_id); |
| if (!rf_events.ok()) { |
| LOG(ERROR) << "Failed to parse redfish event: " << rf_events.status() |
| << "event: " << event.data.value(); |
| return absl::OkStatus(); |
| } |
| |
| for (const auto& rf_event : *rf_events) { |
| auto job = std::make_unique<EventJob>(rf_event); |
| absl::Status status = queue_->Enqueue( |
| std::move(job), |
| absl::bind_front(&BaseEventsManager::ProcessEvent, events_manager_), |
| kEventsPriority); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to enqueue event: " << rf_event.event_id() |
| << " status: " << status; |
| } |
| } |
| return absl::OkStatus(); |
| } |
| |
| bool RMCEventHandler::IsCancelled() const { |
| return events_manager_->IsStopping(); |
| } |
| |
| // Wait (using timer) is added to ensure the RMC sends all the pending events |
| // during the time connection was down (vbmc restarting/machine under repair |
| // etc). This is the contract with the vendor that all the pending events should |
| // start within 30 sec of starting the SSE connection and there would not be |
| // more than 10 sec interval b/w each event. |
| // Edge cases of a single event landing in Init state vs Serving is not relevant |
| void RMCEventHandler::InitTimeoutExpired() { |
| static absl::NoDestructor<voyager::Job> job; |
| absl::Status status = queue_->Enqueue( |
| *job, |
| absl::bind_front(&BaseEventsManager::ProcessInitExpiry, events_manager_), |
| kEventsPriority); |
| VLOG(kVlogVerbosity) << "InitTimeoutExpired exit: " << status; |
| } |
| |
| MachineHealth GetMachineHealth(EventSeverity severity) { |
| switch (severity) { |
| case EventSeverity::kWarning: |
| return MachineHealth::kWarning; |
| case EventSeverity::kEmergent: |
| return MachineHealth::kEmergent; |
| case EventSeverity::kCritical: |
| return MachineHealth::kCritical; |
| default: |
| return MachineHealth::kOk; |
| } |
| } |
| |
| std::string MachineHealthToString(MachineHealth health) { |
| switch (health) { |
| case MachineHealth::kOk: |
| return "Ok"; |
| case MachineHealth::kWarning: |
| return "Warning"; |
| case MachineHealth::kEmergent: |
| return "Emergent"; |
| case MachineHealth::kCritical: |
| return "Critical"; |
| default: |
| return "Unknown"; |
| } |
| } |
| |
| absl::string_view PluginStateToString(PluginState state) { |
| switch (state) { |
| case PluginState::kInit: |
| return "Init"; |
| case PluginState::kReplayDone: |
| return "ReplayDone"; |
| case PluginState::kStreamStarting: |
| return "StreamStarting"; |
| case PluginState::kStreamStarted: |
| return "StreamStarted"; |
| case PluginState::kServing: |
| return "Serving"; |
| case PluginState::kStop: |
| return "Stop"; |
| default: |
| return "Unknown"; |
| } |
| } |
| |
| } // namespace milotic |