| #include "cper/plugin.h" |
| |
| #include <stdlib.h> |
| |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "google/protobuf/any.pb.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/numbers.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/clock.h" |
| #include "absl/time/time.h" |
| #include "redfish_query_engine/http/codes.h" |
| #include "nlohmann/json.hpp" |
| #include "nlohmann/json_fwd.hpp" |
| #include "cper/cper_event.h" |
| #include "cper/metrics.h" |
| #include "monitoring.h" |
| #include "proxy.h" |
| #include "proxy_builder.h" |
| #include "redfish_plugin.h" |
| #include "request_response.h" |
| #include "sse_plugin/event.h" |
| #include "events.pb.h" |
| #include "sse_plugin/events_manager.h" |
| #include "utils/std_thread.h" |
| #include "voyager/priority_queue.hpp" |
| |
| namespace milotic { |
| |
| namespace { |
| constexpr int kVlogVerbosity = 1; |
| } // namespace |
| |
| constexpr absl::string_view kResetErrorCounters = |
| "/google/cper/reset_error_counters"; |
| constexpr absl::string_view kPostForceKillPath = "/google/cper/forcekill"; |
| constexpr absl::string_view kGetMachineHealthPath = |
| "/google/v1/Chassis/stellaris/Oem/Google/Health"; |
| constexpr absl::string_view kMachineHealthResourceName = "ots-cn-health-status"; |
| constexpr absl::string_view kRequestIdHeader = "vbmc-internal:ReqID"; |
| |
| using ::ecclesia::HttpResponseCode::HTTP_CODE_BAD_REQUEST; |
| using ::ecclesia::HttpResponseCode::HTTP_CODE_FORBIDDEN; |
| using ::ecclesia::HttpResponseCode::HTTP_CODE_REQUEST_OK; |
| using ::ecclesia::HttpResponseCode::HTTP_CODE_SERVICE_UNAV; |
| using ::ecclesia::HttpResponseCode::HTTP_CODE_UNAUTHORIZED; |
| |
| absl::Status CperEventsPlugin::Initialize(Proxy* proxy) { |
| VLOG(kVlogVerbosity) << "Initialize"; |
| proxy_ = proxy; |
| events_manager_.SetProxy(proxy); |
| // Don't need to init events manager for certain tests |
| if (init_events_manager_) events_manager_.StartMainThread(); |
| VLOG(kVlogVerbosity) << "Initialize done"; |
| return absl::OkStatus(); |
| } |
| |
| CperEventsPlugin::RequestAction CperEventsPlugin::PreprocessRequest( |
| RedfishPlugin::RequestVerb request_verb, ProxyRequest& request) { |
| std::vector<std::pair<RedfishPlugin::RequestVerb, absl::string_view>> |
| paths_to_handle = { |
| {RedfishPlugin::RequestVerb::kPut, put_processed_events_path_}, |
| {RedfishPlugin::RequestVerb::kSubscribe, subscribe_events_path_}, |
| {RedfishPlugin::RequestVerb::kGet, kGetMachineHealthPath}, |
| {RedfishPlugin::RequestVerb::kPost, kPostForceKillPath}, |
| {RedfishPlugin::RequestVerb::kPost, kResetErrorCounters}, |
| }; |
| return milotic::Proxy::GetRequestAction(request_verb, request, |
| paths_to_handle); |
| } |
| |
| absl::StatusOr<ProxyResponse> CperEventsPlugin::HandleRequest( |
| RedfishPlugin::RequestVerb verb, |
| std::unique_ptr<ProxyRequest> http_request) { |
| VLOG(kVlogVerbosity) << "Request path: " << http_request->GetPath(); |
| |
| // Making a copy to pass to latency monitor as request gets moved. |
| const std::string request_path = std::string(http_request->GetPath()); |
| LatencyMonitor latency_monitor(&CperMetrics::Get().cper_request_latency, |
| {VerbToString(verb), request_path}); |
| |
| if (verb == RedfishPlugin::RequestVerb::kPut && |
| request_path == put_processed_events_path_) { |
| return CaptureResponseCode( |
| HandlePutProcessedEvents(std::move(http_request)), |
| &CperMetrics::Get().cper_response_code, request_path, |
| /*field_values=*/{}); |
| } |
| |
| if (verb == RedfishPlugin::RequestVerb::kGet && |
| request_path == kGetMachineHealthPath) { |
| return CaptureResponseCode(HandleGetHealth(std::move(http_request)), |
| &CperMetrics::Get().cper_response_code, |
| request_path, |
| /*field_values=*/{}); |
| } |
| |
| if (verb == RedfishPlugin::RequestVerb::kPost && |
| request_path == kPostForceKillPath) { |
| return milotic::CaptureResponseCode( |
| HandlePostForceKill(std::move(http_request)), |
| &CperMetrics::Get().cper_response_code, request_path, |
| /*field_values=*/{}); |
| } |
| |
| if (verb == RedfishPlugin::RequestVerb::kPost && |
| request_path == kResetErrorCounters) { |
| return milotic::CaptureResponseCode( |
| HandleResetErrorCounters(std::move(http_request)), |
| &CperMetrics::Get().cper_response_code, request_path, |
| /*field_values=*/{}); |
| } |
| |
| return ProxyResponse(HTTP_CODE_BAD_REQUEST, |
| absl::StrCat("Unable to handle request on path: ", |
| http_request->GetPath()), |
| {{"Content-Type", "text/plain"}}); |
| } |
| |
| static void KillThreadRun() { |
| LOG(INFO) << "Kill vBMC started"; |
| absl::SleepFor(absl::Seconds(5)); |
| // go/totw/20 - Use quick_exit. |
| LOG(INFO) << "Killing vBMC"; |
| quick_exit(0); |
| } |
| |
| absl::StatusOr<milotic::ProxyResponse> CperEventsPlugin::HandlePostForceKill( |
| std::unique_ptr<milotic::ProxyRequest> request) { |
| milotic::ProxyResponse response(HTTP_CODE_REQUEST_OK, "", |
| {{"Content-Type", "text/plain"}}); |
| absl::MutexLock lock(kill_mutex_); |
| if (kill_thread_ == nullptr) { |
| kill_thread_ = |
| std::make_unique<StdThread>("KillThread", std::move(KillThreadRun)); |
| absl::Status status = kill_thread_->Start(); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to start kill thread, status: " << status; |
| } |
| } |
| return response; |
| } |
| |
| absl::StatusOr<milotic::ProxyResponse> |
| CperEventsPlugin::HandleResetErrorCounters( |
| std::unique_ptr<milotic::ProxyRequest> request) { |
| milotic::ProxyResponse response(HTTP_CODE_REQUEST_OK, "", |
| {{"Content-Type", "text/plain"}}); |
| absl::MutexLock lock(reset_error_counters_mutex_); |
| if ((!events_manager_.IsServing() && !events_manager_.IsStopping())) { |
| response.SetCode(HTTP_CODE_SERVICE_UNAV); |
| response.SetBody(absl::StrCat("vBMC is Initializing, current state: ", |
| events_manager_.ExplainPluginState())); |
| return response; |
| } |
| if (!storage_manager_->SendHeartbeat().ok()) { |
| response.SetCode(HTTP_CODE_SERVICE_UNAV); |
| response.SetBody("vBMC is not connected to the collector"); |
| return response; |
| } |
| |
| // Archive events in the storage manager first and then reset. |
| LOG(INFO) << "Archiving processed events"; |
| absl::Status status = storage_manager_->ArchiveProcessedEvents(); |
| if (!status.ok()) { |
| response.SetCode(HTTP_CODE_SERVICE_UNAV); |
| response.SetBody( |
| absl::StrCat("Failed to archive processed events: ", status.message())); |
| return response; |
| } |
| LOG(INFO) << "Resetting storage manager"; |
| storage_manager_->Reset(); |
| |
| LOG(INFO) << "Resetting events manager"; |
| events_manager_.Reset(); |
| |
| response.SetBody("Reset completed"); |
| return response; |
| } |
| |
| absl::StatusOr<ProxyResponse> CperEventsPlugin::HandleGetHealth( |
| std::unique_ptr<ProxyRequest> request) { |
| ProxyResponse response(HTTP_CODE_BAD_REQUEST, "", |
| {{"Content-Type", "text/plain"}}); |
| if (events_manager_.IsServing() || events_manager_.IsStopping()) { |
| response.SetCode(HTTP_CODE_REQUEST_OK); |
| nlohmann::json json_response = { |
| {"@odata.id", kGetMachineHealthPath}, |
| {"@odata.type", "#OtsHealth.v1_0_0.OtsHealth"}, |
| {"Id", |
| kGetMachineHealthPath.substr(kGetMachineHealthPath.rfind('/') + 1)}, |
| {"Name", kMachineHealthResourceName}, |
| {"Description", "Rolled up health status for the OTS CN Arena"}, |
| }; |
| json_response["LastHealthChange"] = |
| events_manager_.machine_state_->LastHealthChangeDetails(); |
| json_response["Status"]["HealthRollup"] = |
| MachineHealthToString(events_manager_.machine_state_->health()); |
| |
| json_response["CnsDirPath"] = events_manager_.GetCnsDir(); |
| response.SetBody(json_response.dump()); |
| response.AddHeader("Content-Type", "application/json"); |
| response.AddHeader("OData-Version", "4.0"); |
| } else { |
| response.SetCode(HTTP_CODE_SERVICE_UNAV); |
| response.SetBody(absl::StrCat("vBMC is Initializing, current state: ", |
| events_manager_.ExplainPluginState())); |
| } |
| return response; |
| } |
| |
| absl::StatusOr<ProxyResponse> CperEventsPlugin::HandlePutProcessedEvents( |
| std::unique_ptr<ProxyRequest> request) { |
| ProxyResponse response(HTTP_CODE_BAD_REQUEST, "", |
| {{"Content-Type", "text/plain"}}); |
| |
| if (events_manager_.IsStopping()) { |
| response.SetBody("vBMC is stopping"); |
| return response; |
| } |
| |
| google::protobuf::Any any; |
| if (!any.ParseFromString(request->GetBody())) { |
| VLOG(kVlogVerbosity) << "Failed to parse Request body to proto Any" |
| << request->GetBody(); |
| response.SetBody("Failed to parse Request body to proto Any"); |
| return response; |
| } |
| platforms_vbmc::RequestData request_data; |
| if (!any.UnpackTo(&request_data)) { |
| VLOG(kVlogVerbosity) << "Failed to unpack to RequestData"; |
| response.SetBody("Failed to unpack to RequestData"); |
| return response; |
| } |
| |
| absl::string_view request_id = request->GetHeader(kRequestIdHeader); |
| if (request_id.empty()) { |
| VLOG(kVlogVerbosity) << "ReqID not present in request"; |
| response.SetBody("ReqID not present in request"); |
| return response; |
| } |
| if (!collector_request_id_filter_.empty() && |
| collector_request_id_filter_ != request_id) { |
| response.SetCode(HTTP_CODE_FORBIDDEN); |
| response.SetBody(absl::StrCat("Collector request id filter set in config: ", |
| collector_request_id_filter_, |
| " did not match requestID: ", request_id)); |
| VLOG(kVlogVerbosity) << response.GetBody(); |
| return response; |
| } |
| std::string connection_id = storage_manager_->ConnectionId(); |
| if (!connection_id.empty()) { |
| // If event handler id is already set |
| // Check if the stream connection is active |
| absl::Status status = storage_manager_->SendHeartbeat(); |
| if (status.ok()) { |
| // If the stream is active, we should ignore the request |
| // since its from a diff collector since each collector can only send |
| // one stream request |
| LOG_EVERY_N_SEC(WARNING, 10) |
| << "Got PUT request from collector with diff ID, ignoring"; |
| response.SetCode(HTTP_CODE_UNAUTHORIZED); |
| response.SetBody("vBMC already registered with another collector"); |
| return response; |
| } |
| // If the stream is not active, we should restart vBMC |
| LOG(INFO) << "Received processed events request from new collector, old " |
| "stream is not active, resetting events manager"; |
| LOG(INFO) << "Old ConnectionID: " << connection_id; |
| LOG(INFO) << "New ConnectionID: " << request_id; |
| LOG(INFO) << "Request data: " << request_data; |
| |
| LOG(INFO) << "Resetting storage manager"; |
| storage_manager_->Reset(); |
| LOG(INFO) << "Resetting events manager"; |
| events_manager_.Reset(); |
| LOG(INFO) << "Reset completed"; |
| } |
| |
| // Even though we don't use replay events to bootstrap machine state, we still |
| // need to parse them to ensure they are valid. |
| // Keeps them available in case we need to use them in future. |
| absl::StatusOr<std::vector<milotic::Event>> events = |
| CperEvent::ParseReplayEvents(request_data.replay_events_json()); |
| if (!events.ok()) { |
| if (events.status().code() == absl::StatusCode::kInvalidArgument) { |
| LOG(ERROR) << "Failed to parse events: " << events.status() |
| << "\nRequest body: " << request->GetBody(); |
| |
| response.SetBody("Failed to parse events"); |
| return response; |
| } |
| LOG(ERROR) << "Failed to parse events: " << events.status(); |
| return events.status(); |
| } |
| |
| int last_event_id = 0; |
| if (!request_data.last_event_id().empty() && |
| !absl::SimpleAtoi(request_data.last_event_id(), &last_event_id)) { |
| LOG(ERROR) << "LastEventId not valid, should be an int" |
| << request_data.last_event_id(); |
| response.SetBody(absl::StrCat("LastEventId not valid, should be an int: ", |
| request_data.last_event_id())); |
| return response; |
| } |
| |
| // First request since init, set connection id |
| storage_manager_->SetConnectionId(request_id); |
| LOG(INFO) << "Setting connection id: " << request_id; |
| |
| events_manager_.SetCnsDir(request_data.cns_dir_path()); |
| if (last_event_id > 0) { |
| events_manager_.SetLastEventId(last_event_id); |
| LOG(INFO) << "Setting last event id: " << last_event_id; |
| } |
| |
| // We don't need to use replay events to bootstrap machine state. |
| // Existing events will be Archived after vBMC init and machine state is only |
| // used by machdocd which persists the symptoms created based on previous |
| // state. |
| storage_manager_->SetProcessedEvents(std::vector<Event>{}); |
| response.SetCode(HTTP_CODE_REQUEST_OK); |
| return response; |
| } |
| |
| absl::Status CperEventsPlugin::Subscribe(std::unique_ptr<ProxyRequest> request, |
| EventHandler* handler) { |
| VLOG(kVlogVerbosity) << "Request path: " << request->GetPath(); |
| if (request->GetPath() == subscribe_events_path_) { |
| absl::string_view request_id = request->GetHeader(kRequestIdHeader); |
| if (request_id.empty()) { |
| LOG(ERROR) << "ReqID not present in request"; |
| return absl::InvalidArgumentError("ReqID not present in request"); |
| } |
| if (storage_manager_->ConnectionIdIsEmpty() || |
| !storage_manager_->CheckConnectionId(request_id)) { |
| LOG(ERROR) << "Connection id is not set or is different"; |
| return absl::InvalidArgumentError( |
| "Connection id is not set or is different"); |
| } |
| // Only accept the stream for the collector we received PUT request |
| auto response = ProxyResponse(HTTP_CODE_REQUEST_OK); |
| if (absl::Status status = handler->OnResponse(response); !status.ok()) { |
| LOG(ERROR) << "OnResponse failed: " << status; |
| return absl::InternalError( |
| absl::StrCat("On Response failed: ", status.message())); |
| } |
| |
| LOG(INFO) << "Setting event handler"; |
| storage_manager_->SetEventHandler(handler); |
| LOG(INFO) << "Waiting for disconnect"; |
| storage_manager_->AwaitDisconnect(); |
| LOG(INFO) << "Resetting connection"; |
| storage_manager_->ResetEventHandler(); |
| return absl::OkStatus(); |
| } |
| |
| return absl::InternalError("Unable to handle Subscribe on path: " + |
| std::string(request->GetPath())); |
| } |
| |
| CperEventsPlugin::~CperEventsPlugin() { |
| events_queue_.Shutdown(/*run_enqueued_jobs=*/false); |
| events_manager_.Shutdown(); |
| } |
| |
| REGISTER_REDFISH_PLUGIN(cper_events, CperEventsPlugin); |
| |
| } // namespace milotic |