blob: c4fe5ef0f55c2ad1b4b056415830070395c961d5 [file] [log] [blame] [edit]
#include "cper/plugin.h"
#include <stdlib.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "google/protobuf/any.pb.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "redfish_query_engine/http/codes.h"
#include "nlohmann/json.hpp"
#include "nlohmann/json_fwd.hpp"
#include "cper/cper_event.h"
#include "cper/metrics.h"
#include "monitoring.h"
#include "proxy.h"
#include "proxy_builder.h"
#include "redfish_plugin.h"
#include "request_response.h"
#include "sse_plugin/event.h"
#include "events.pb.h"
#include "sse_plugin/events_manager.h"
#include "utils/std_thread.h"
#include "voyager/priority_queue.hpp"
namespace milotic {
namespace {
constexpr int kVlogVerbosity = 1;
} // namespace
constexpr absl::string_view kResetErrorCounters =
"/google/cper/reset_error_counters";
constexpr absl::string_view kPostForceKillPath = "/google/cper/forcekill";
constexpr absl::string_view kGetMachineHealthPath =
"/google/v1/Chassis/stellaris/Oem/Google/Health";
constexpr absl::string_view kMachineHealthResourceName = "ots-cn-health-status";
constexpr absl::string_view kRequestIdHeader = "vbmc-internal:ReqID";
using ::ecclesia::HttpResponseCode::HTTP_CODE_BAD_REQUEST;
using ::ecclesia::HttpResponseCode::HTTP_CODE_FORBIDDEN;
using ::ecclesia::HttpResponseCode::HTTP_CODE_REQUEST_OK;
using ::ecclesia::HttpResponseCode::HTTP_CODE_SERVICE_UNAV;
using ::ecclesia::HttpResponseCode::HTTP_CODE_UNAUTHORIZED;
absl::Status CperEventsPlugin::Initialize(Proxy* proxy) {
VLOG(kVlogVerbosity) << "Initialize";
proxy_ = proxy;
events_manager_.SetProxy(proxy);
// Don't need to init events manager for certain tests
if (init_events_manager_) events_manager_.StartMainThread();
VLOG(kVlogVerbosity) << "Initialize done";
return absl::OkStatus();
}
CperEventsPlugin::RequestAction CperEventsPlugin::PreprocessRequest(
RedfishPlugin::RequestVerb request_verb, ProxyRequest& request) {
std::vector<std::pair<RedfishPlugin::RequestVerb, absl::string_view>>
paths_to_handle = {
{RedfishPlugin::RequestVerb::kPut, put_processed_events_path_},
{RedfishPlugin::RequestVerb::kSubscribe, subscribe_events_path_},
{RedfishPlugin::RequestVerb::kGet, kGetMachineHealthPath},
{RedfishPlugin::RequestVerb::kPost, kPostForceKillPath},
{RedfishPlugin::RequestVerb::kPost, kResetErrorCounters},
};
return milotic::Proxy::GetRequestAction(request_verb, request,
paths_to_handle);
}
absl::StatusOr<ProxyResponse> CperEventsPlugin::HandleRequest(
RedfishPlugin::RequestVerb verb,
std::unique_ptr<ProxyRequest> http_request) {
VLOG(kVlogVerbosity) << "Request path: " << http_request->GetPath();
// Making a copy to pass to latency monitor as request gets moved.
const std::string request_path = std::string(http_request->GetPath());
LatencyMonitor latency_monitor(&CperMetrics::Get().cper_request_latency,
{VerbToString(verb), request_path});
if (verb == RedfishPlugin::RequestVerb::kPut &&
request_path == put_processed_events_path_) {
return CaptureResponseCode(
HandlePutProcessedEvents(std::move(http_request)),
&CperMetrics::Get().cper_response_code, request_path,
/*field_values=*/{});
}
if (verb == RedfishPlugin::RequestVerb::kGet &&
request_path == kGetMachineHealthPath) {
return CaptureResponseCode(HandleGetHealth(std::move(http_request)),
&CperMetrics::Get().cper_response_code,
request_path,
/*field_values=*/{});
}
if (verb == RedfishPlugin::RequestVerb::kPost &&
request_path == kPostForceKillPath) {
return milotic::CaptureResponseCode(
HandlePostForceKill(std::move(http_request)),
&CperMetrics::Get().cper_response_code, request_path,
/*field_values=*/{});
}
if (verb == RedfishPlugin::RequestVerb::kPost &&
request_path == kResetErrorCounters) {
return milotic::CaptureResponseCode(
HandleResetErrorCounters(std::move(http_request)),
&CperMetrics::Get().cper_response_code, request_path,
/*field_values=*/{});
}
return ProxyResponse(HTTP_CODE_BAD_REQUEST,
absl::StrCat("Unable to handle request on path: ",
http_request->GetPath()),
{{"Content-Type", "text/plain"}});
}
static void KillThreadRun() {
LOG(INFO) << "Kill vBMC started";
absl::SleepFor(absl::Seconds(5));
// go/totw/20 - Use quick_exit.
LOG(INFO) << "Killing vBMC";
quick_exit(0);
}
absl::StatusOr<milotic::ProxyResponse> CperEventsPlugin::HandlePostForceKill(
std::unique_ptr<milotic::ProxyRequest> request) {
milotic::ProxyResponse response(HTTP_CODE_REQUEST_OK, "",
{{"Content-Type", "text/plain"}});
absl::MutexLock lock(kill_mutex_);
if (kill_thread_ == nullptr) {
kill_thread_ =
std::make_unique<StdThread>("KillThread", std::move(KillThreadRun));
absl::Status status = kill_thread_->Start();
if (!status.ok()) {
LOG(ERROR) << "Failed to start kill thread, status: " << status;
}
}
return response;
}
absl::StatusOr<milotic::ProxyResponse>
CperEventsPlugin::HandleResetErrorCounters(
std::unique_ptr<milotic::ProxyRequest> request) {
milotic::ProxyResponse response(HTTP_CODE_REQUEST_OK, "",
{{"Content-Type", "text/plain"}});
absl::MutexLock lock(reset_error_counters_mutex_);
if ((!events_manager_.IsServing() && !events_manager_.IsStopping())) {
response.SetCode(HTTP_CODE_SERVICE_UNAV);
response.SetBody(absl::StrCat("vBMC is Initializing, current state: ",
events_manager_.ExplainPluginState()));
return response;
}
if (!storage_manager_->SendHeartbeat().ok()) {
response.SetCode(HTTP_CODE_SERVICE_UNAV);
response.SetBody("vBMC is not connected to the collector");
return response;
}
// Archive events in the storage manager first and then reset.
LOG(INFO) << "Archiving processed events";
absl::Status status = storage_manager_->ArchiveProcessedEvents();
if (!status.ok()) {
response.SetCode(HTTP_CODE_SERVICE_UNAV);
response.SetBody(
absl::StrCat("Failed to archive processed events: ", status.message()));
return response;
}
LOG(INFO) << "Resetting storage manager";
storage_manager_->Reset();
LOG(INFO) << "Resetting events manager";
events_manager_.Reset();
response.SetBody("Reset completed");
return response;
}
absl::StatusOr<ProxyResponse> CperEventsPlugin::HandleGetHealth(
std::unique_ptr<ProxyRequest> request) {
ProxyResponse response(HTTP_CODE_BAD_REQUEST, "",
{{"Content-Type", "text/plain"}});
if (events_manager_.IsServing() || events_manager_.IsStopping()) {
response.SetCode(HTTP_CODE_REQUEST_OK);
nlohmann::json json_response = {
{"@odata.id", kGetMachineHealthPath},
{"@odata.type", "#OtsHealth.v1_0_0.OtsHealth"},
{"Id",
kGetMachineHealthPath.substr(kGetMachineHealthPath.rfind('/') + 1)},
{"Name", kMachineHealthResourceName},
{"Description", "Rolled up health status for the OTS CN Arena"},
};
json_response["LastHealthChange"] =
events_manager_.machine_state_->LastHealthChangeDetails();
json_response["Status"]["HealthRollup"] =
MachineHealthToString(events_manager_.machine_state_->health());
json_response["CnsDirPath"] = events_manager_.GetCnsDir();
response.SetBody(json_response.dump());
response.AddHeader("Content-Type", "application/json");
response.AddHeader("OData-Version", "4.0");
} else {
response.SetCode(HTTP_CODE_SERVICE_UNAV);
response.SetBody(absl::StrCat("vBMC is Initializing, current state: ",
events_manager_.ExplainPluginState()));
}
return response;
}
absl::StatusOr<ProxyResponse> CperEventsPlugin::HandlePutProcessedEvents(
std::unique_ptr<ProxyRequest> request) {
ProxyResponse response(HTTP_CODE_BAD_REQUEST, "",
{{"Content-Type", "text/plain"}});
if (events_manager_.IsStopping()) {
response.SetBody("vBMC is stopping");
return response;
}
google::protobuf::Any any;
if (!any.ParseFromString(request->GetBody())) {
VLOG(kVlogVerbosity) << "Failed to parse Request body to proto Any"
<< request->GetBody();
response.SetBody("Failed to parse Request body to proto Any");
return response;
}
platforms_vbmc::RequestData request_data;
if (!any.UnpackTo(&request_data)) {
VLOG(kVlogVerbosity) << "Failed to unpack to RequestData";
response.SetBody("Failed to unpack to RequestData");
return response;
}
absl::string_view request_id = request->GetHeader(kRequestIdHeader);
if (request_id.empty()) {
VLOG(kVlogVerbosity) << "ReqID not present in request";
response.SetBody("ReqID not present in request");
return response;
}
if (!collector_request_id_filter_.empty() &&
collector_request_id_filter_ != request_id) {
response.SetCode(HTTP_CODE_FORBIDDEN);
response.SetBody(absl::StrCat("Collector request id filter set in config: ",
collector_request_id_filter_,
" did not match requestID: ", request_id));
VLOG(kVlogVerbosity) << response.GetBody();
return response;
}
std::string connection_id = storage_manager_->ConnectionId();
if (!connection_id.empty()) {
// If event handler id is already set
// Check if the stream connection is active
absl::Status status = storage_manager_->SendHeartbeat();
if (status.ok()) {
// If the stream is active, we should ignore the request
// since its from a diff collector since each collector can only send
// one stream request
LOG_EVERY_N_SEC(WARNING, 10)
<< "Got PUT request from collector with diff ID, ignoring";
response.SetCode(HTTP_CODE_UNAUTHORIZED);
response.SetBody("vBMC already registered with another collector");
return response;
}
// If the stream is not active, we should restart vBMC
LOG(INFO) << "Received processed events request from new collector, old "
"stream is not active, resetting events manager";
LOG(INFO) << "Old ConnectionID: " << connection_id;
LOG(INFO) << "New ConnectionID: " << request_id;
LOG(INFO) << "Request data: " << request_data;
LOG(INFO) << "Resetting storage manager";
storage_manager_->Reset();
LOG(INFO) << "Resetting events manager";
events_manager_.Reset();
LOG(INFO) << "Reset completed";
}
// Even though we don't use replay events to bootstrap machine state, we still
// need to parse them to ensure they are valid.
// Keeps them available in case we need to use them in future.
absl::StatusOr<std::vector<milotic::Event>> events =
CperEvent::ParseReplayEvents(request_data.replay_events_json());
if (!events.ok()) {
if (events.status().code() == absl::StatusCode::kInvalidArgument) {
LOG(ERROR) << "Failed to parse events: " << events.status()
<< "\nRequest body: " << request->GetBody();
response.SetBody("Failed to parse events");
return response;
}
LOG(ERROR) << "Failed to parse events: " << events.status();
return events.status();
}
int last_event_id = 0;
if (!request_data.last_event_id().empty() &&
!absl::SimpleAtoi(request_data.last_event_id(), &last_event_id)) {
LOG(ERROR) << "LastEventId not valid, should be an int"
<< request_data.last_event_id();
response.SetBody(absl::StrCat("LastEventId not valid, should be an int: ",
request_data.last_event_id()));
return response;
}
// First request since init, set connection id
storage_manager_->SetConnectionId(request_id);
LOG(INFO) << "Setting connection id: " << request_id;
events_manager_.SetCnsDir(request_data.cns_dir_path());
if (last_event_id > 0) {
events_manager_.SetLastEventId(last_event_id);
LOG(INFO) << "Setting last event id: " << last_event_id;
}
// We don't need to use replay events to bootstrap machine state.
// Existing events will be Archived after vBMC init and machine state is only
// used by machdocd which persists the symptoms created based on previous
// state.
storage_manager_->SetProcessedEvents(std::vector<Event>{});
response.SetCode(HTTP_CODE_REQUEST_OK);
return response;
}
absl::Status CperEventsPlugin::Subscribe(std::unique_ptr<ProxyRequest> request,
EventHandler* handler) {
VLOG(kVlogVerbosity) << "Request path: " << request->GetPath();
if (request->GetPath() == subscribe_events_path_) {
absl::string_view request_id = request->GetHeader(kRequestIdHeader);
if (request_id.empty()) {
LOG(ERROR) << "ReqID not present in request";
return absl::InvalidArgumentError("ReqID not present in request");
}
if (storage_manager_->ConnectionIdIsEmpty() ||
!storage_manager_->CheckConnectionId(request_id)) {
LOG(ERROR) << "Connection id is not set or is different";
return absl::InvalidArgumentError(
"Connection id is not set or is different");
}
// Only accept the stream for the collector we received PUT request
auto response = ProxyResponse(HTTP_CODE_REQUEST_OK);
if (absl::Status status = handler->OnResponse(response); !status.ok()) {
LOG(ERROR) << "OnResponse failed: " << status;
return absl::InternalError(
absl::StrCat("On Response failed: ", status.message()));
}
LOG(INFO) << "Setting event handler";
storage_manager_->SetEventHandler(handler);
LOG(INFO) << "Waiting for disconnect";
storage_manager_->AwaitDisconnect();
LOG(INFO) << "Resetting connection";
storage_manager_->ResetEventHandler();
return absl::OkStatus();
}
return absl::InternalError("Unable to handle Subscribe on path: " +
std::string(request->GetPath()));
}
CperEventsPlugin::~CperEventsPlugin() {
events_queue_.Shutdown(/*run_enqueued_jobs=*/false);
events_manager_.Shutdown();
}
REGISTER_REDFISH_PLUGIN(cper_events, CperEventsPlugin);
} // namespace milotic