blob: 9b145455a85e4f859f1e3d4e7f09a20dffab797c [file] [log] [blame]
#include "cper/plugin.h"
#include <stdlib.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "google/protobuf/any.pb.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "redfish_query_engine/http/client.h"
#include "redfish_query_engine/http/codes.h"
#include "nlohmann/json.hpp"
#include "nlohmann/json_fwd.hpp"
#include "cper/metrics.h"
#include "monitoring.h"
#include "proxy.h"
#include "proxy_builder.h"
#include "redfish_plugin.h"
#include "request_response.h"
#include "sse_plugin/event.h"
#include "events.pb.h"
#include "sse_plugin/events_manager.h"
#include "utils/std_thread.h"
#include "voyager/priority_queue.hpp"
namespace milotic {
namespace {
constexpr int kVlogVerbosity = 1;
} // namespace
constexpr absl::string_view kResetErrorCounters =
"/google/cper/reset_error_counters";
constexpr absl::string_view kPostForceKillPath = "/google/cper/forcekill";
constexpr absl::string_view kGetMachineHealthPath =
"/google/v1/Chassis/stellaris/Oem/Google/Health";
constexpr absl::string_view kMachineHealthResourceName = "ots-cn-health-status";
constexpr absl::string_view kRequestIdHeader = "vbmc-internal:ReqID";
using ::ecclesia::HttpResponseCode::HTTP_CODE_BAD_REQUEST;
using ::ecclesia::HttpResponseCode::HTTP_CODE_FORBIDDEN;
using ::ecclesia::HttpResponseCode::HTTP_CODE_REQUEST_OK;
using ::ecclesia::HttpResponseCode::HTTP_CODE_SERVICE_UNAV;
using ::ecclesia::HttpResponseCode::HTTP_CODE_UNAUTHORIZED;
absl::Status CperEventsPlugin::Initialize(Proxy* proxy) {
VLOG(kVlogVerbosity) << "Initialize";
proxy_ = proxy;
events_manager_.SetProxy(proxy);
// Don't need to init events manager for certain tests
if (init_events_manager_) events_manager_.StartMainThread();
VLOG(kVlogVerbosity) << "Initialize done";
return absl::OkStatus();
}
CperEventsPlugin::RequestAction CperEventsPlugin::PreprocessRequest(
RedfishPlugin::RequestVerb request_verb, ProxyRequest& request) {
std::vector<std::pair<RedfishPlugin::RequestVerb, absl::string_view>>
paths_to_handle = {
{RedfishPlugin::RequestVerb::kPut, put_processed_events_path_},
{RedfishPlugin::RequestVerb::kSubscribe, subscribe_events_path_},
{RedfishPlugin::RequestVerb::kGet, kGetMachineHealthPath},
{RedfishPlugin::RequestVerb::kPost, kPostForceKillPath},
{RedfishPlugin::RequestVerb::kPost, kResetErrorCounters},
};
return proxy_->GetRequestAction(request_verb, request, paths_to_handle);
}
absl::StatusOr<ProxyResponse> CperEventsPlugin::HandleRequest(
RedfishPlugin::RequestVerb verb,
std::unique_ptr<ProxyRequest> http_request) {
VLOG(kVlogVerbosity) << "Request uri: " << http_request->uri;
// Making a copy to pass to latency monitor as request gets moved.
const std::string request_path = proxy_->GetPath(http_request->uri);
LatencyMonitor latency_monitor(&CperMetrics::Get().cper_request_latency,
{VerbToString(verb), request_path});
if (verb == RedfishPlugin::RequestVerb::kPut &&
request_path == put_processed_events_path_) {
return CaptureResponseCode(
HandlePutProcessedEvents(std::move(http_request)),
&CperMetrics::Get().cper_response_code, request_path,
/*field_values=*/{});
}
if (verb == RedfishPlugin::RequestVerb::kGet &&
request_path == kGetMachineHealthPath) {
return CaptureResponseCode(HandleGetHealth(std::move(http_request)),
&CperMetrics::Get().cper_response_code,
request_path,
/*field_values=*/{});
}
if (verb == RedfishPlugin::RequestVerb::kPost &&
request_path == kPostForceKillPath) {
return milotic::CaptureResponseCode(
HandlePostForceKill(std::move(http_request)),
&CperMetrics::Get().cper_response_code, request_path,
/*field_values=*/{});
}
if (verb == RedfishPlugin::RequestVerb::kPost &&
request_path == kResetErrorCounters) {
return milotic::CaptureResponseCode(
HandleResetErrorCounters(std::move(http_request)),
&CperMetrics::Get().cper_response_code, request_path,
/*field_values=*/{});
}
return ProxyResponse(
HTTP_CODE_BAD_REQUEST,
absl::StrCat("Unable to handle request on uri: ", http_request->uri),
{{"Content-Type", "text/plain"}});
}
static void KillThreadRun() {
LOG(INFO) << "Kill vBMC started";
absl::SleepFor(absl::Seconds(5));
// go/totw/20 - Use quick_exit.
LOG(INFO) << "Killing vBMC";
quick_exit(0);
}
absl::StatusOr<milotic::ProxyResponse> CperEventsPlugin::HandlePostForceKill(
std::unique_ptr<milotic::ProxyRequest> request) {
milotic::ProxyResponse response(HTTP_CODE_REQUEST_OK,
{{"Content-Type", "text/plain"}});
absl::MutexLock lock(&kill_mutex_);
if (kill_thread_ == nullptr) {
kill_thread_ =
std::make_unique<StdThread>("KillThread", std::move(KillThreadRun));
kill_thread_->Start();
}
return response;
}
absl::StatusOr<milotic::ProxyResponse>
CperEventsPlugin::HandleResetErrorCounters(
std::unique_ptr<milotic::ProxyRequest> request) {
milotic::ProxyResponse response(HTTP_CODE_REQUEST_OK,
{{"Content-Type", "text/plain"}});
absl::MutexLock lock(&reset_error_counters_mutex_);
if ((!events_manager_.IsServing() && !events_manager_.IsStopping())) {
response.code = HTTP_CODE_SERVICE_UNAV;
response.body = absl::StrCat("vBMC is Initializing, current state: ",
events_manager_.ExplainPluginState());
return response;
}
if (!storage_manager_->SendHeartbeat().ok()) {
response.code = HTTP_CODE_SERVICE_UNAV;
response.body = "vBMC is not connected to the collector";
return response;
}
// Archive events in the storage manager first and then reset.
LOG(INFO) << "Archiving processed events";
absl::Status status = storage_manager_->ArchiveProcessedEvents();
if (!status.ok()) {
response.code = HTTP_CODE_SERVICE_UNAV;
response.body =
absl::StrCat("Failed to archive processed events: ", status.message());
return response;
}
LOG(INFO) << "Resetting storage manager";
storage_manager_->Reset();
LOG(INFO) << "Resetting events manager";
events_manager_.Reset();
response.body = "Reset completed";
return response;
}
absl::StatusOr<ProxyResponse> CperEventsPlugin::HandleGetHealth(
std::unique_ptr<ProxyRequest> request) {
ProxyResponse response(HTTP_CODE_BAD_REQUEST,
{{"Content-Type", "text/plain"}});
if (events_manager_.IsServing() || events_manager_.IsStopping()) {
response.code = HTTP_CODE_REQUEST_OK;
nlohmann::json json_response = {
{"@odata.id", kGetMachineHealthPath},
{"@odata.type", "#OtsHealth.v1_0_0.OtsHealth"},
{"Id",
kGetMachineHealthPath.substr(kGetMachineHealthPath.rfind('/') + 1)},
{"Name", kMachineHealthResourceName},
{"Description", "Rolled up health status for the OTS CN Arena"},
};
json_response["LastHealthChange"] =
events_manager_.machine_state_->LastHealthChangeDetails();
json_response["Status"]["HealthRollup"] =
MachineHealthToString(events_manager_.machine_state_->health());
json_response["CnsDirPath"] = events_manager_.GetCnsDir();
response.body = json_response.dump();
response.headers = {{"Content-Type", "application/json"},
{"OData-Version", "4.0"}};
} else {
response.code = HTTP_CODE_SERVICE_UNAV;
response.body = absl::StrCat("vBMC is Initializing, current state: ",
events_manager_.ExplainPluginState());
}
return response;
}
absl::StatusOr<ProxyResponse> CperEventsPlugin::HandlePutProcessedEvents(
std::unique_ptr<ProxyRequest> request) {
ProxyResponse response(HTTP_CODE_BAD_REQUEST,
{{"Content-Type", "text/plain"}});
if (events_manager_.IsStopping()) {
response.body = "vBMC is stopping";
return response;
}
google::protobuf::Any any;
if (!any.ParseFromString(request->body)) {
VLOG(kVlogVerbosity) << "Failed to parse Request body to proto Any"
<< request->body;
response.body = "Failed to parse Request body to proto Any";
return response;
}
platforms_vbmc::RequestData request_data;
if (!any.UnpackTo(&request_data)) {
VLOG(kVlogVerbosity) << "Failed to unpack to RequestData";
response.body = "Failed to unpack to RequestData";
return response;
}
absl::string_view request_id = request->headers[kRequestIdHeader];
if (request_id.empty()) {
VLOG(kVlogVerbosity) << "ReqID not present in request";
response.body = "ReqID not present in request";
return response;
}
if (!collector_request_id_filter_.empty() &&
collector_request_id_filter_ != request_id) {
response.body = absl::StrCat("Collector request id filter set in config: ",
collector_request_id_filter_,
" did not match requestID: ", request_id);
response.code = HTTP_CODE_FORBIDDEN;
VLOG(kVlogVerbosity) << response.body;
return response;
}
std::string connection_id = storage_manager_->ConnectionId();
if (!connection_id.empty()) {
// If event handler id is already set
// Check if the stream connection is active
absl::Status status = storage_manager_->SendHeartbeat();
if (status.ok()) {
// If the stream is active, we should ignore the request
// since its from a diff collector since each collector can only send
// one stream request
LOG_EVERY_N_SEC(WARNING, 10)
<< "Got PUT request from collector with diff ID, ignoring";
response.code = HTTP_CODE_UNAUTHORIZED;
response.body = "vBMC already registered with another collector";
return response;
}
// If the stream is not active, we should restart vBMC
LOG(INFO) << "Received processed events request from new collector, old "
"stream is not active, resetting events manager";
LOG(INFO) << "Old ConnectionID: " << connection_id;
LOG(INFO) << "New ConnectionID: " << request_id;
LOG(INFO) << "Request data: " << request_data;
LOG(INFO) << "Resetting storage manager";
storage_manager_->Reset();
LOG(INFO) << "Resetting events manager";
events_manager_.Reset();
LOG(INFO) << "Reset completed";
}
int last_event_id = 0;
if (!request_data.last_event_id().empty() &&
!absl::SimpleAtoi(request_data.last_event_id(), &last_event_id)) {
LOG(ERROR) << "LastEventId not valid, should be an int"
<< request_data.last_event_id();
response.body = absl::StrCat("LastEventId not valid, should be an int: ",
request_data.last_event_id());
return response;
}
// First request since init, set connection id
storage_manager_->SetConnectionId(request_id);
LOG(INFO) << "Setting connection id: " << request_id;
events_manager_.SetCnsDir(request_data.cns_dir_path());
if (last_event_id > 0) {
events_manager_.SetLastEventId(last_event_id);
LOG(INFO) << "Setting last event id: " << last_event_id;
}
storage_manager_->SetProcessedEvents(std::vector<Event>{});
response.code = HTTP_CODE_REQUEST_OK;
return response;
}
absl::Status CperEventsPlugin::Subscribe(std::unique_ptr<ProxyRequest> request,
EventHandler* handler) {
VLOG(kVlogVerbosity) << "Request uri: " << request->uri;
if (proxy_->GetPath(request->uri) == subscribe_events_path_) {
absl::string_view request_id = request->headers[kRequestIdHeader];
if (request_id.empty()) {
LOG(ERROR) << "ReqID not present in request";
return absl::InvalidArgumentError("ReqID not present in request");
}
if (storage_manager_->ConnectionIdIsEmpty() ||
!storage_manager_->CheckConnectionId(request_id)) {
LOG(ERROR) << "Connection id is not set or is different";
return absl::InvalidArgumentError(
"Connection id is not set or is different");
}
// Only accept the stream for the collector we received PUT request
auto response = ProxyResponse(HTTP_CODE_REQUEST_OK);
if (absl::Status status = handler->OnResponse(response); !status.ok()) {
LOG(ERROR) << "OnResponse failed: " << status;
return absl::InternalError(
absl::StrCat("On Response failed: ", status.message()));
}
LOG(INFO) << "Setting event handler";
storage_manager_->SetEventHandler(handler);
LOG(INFO) << "Waiting for disconnect";
storage_manager_->AwaitDisconnect();
LOG(INFO) << "Resetting connection";
storage_manager_->ResetEventHandler();
return absl::OkStatus();
}
return absl::InternalError("Unable to handle Subscribe on uri: " +
request->uri);
}
CperEventsPlugin::~CperEventsPlugin() {
events_queue_.Shutdown(/*run_enqueued_jobs=*/false);
events_manager_.Shutdown();
}
REGISTER_REDFISH_PLUGIN(cper_events, CperEventsPlugin);
} // namespace milotic