blob: 8a46385f1965b75aff65ce3d3b9ead6ff251407b [file] [log] [blame] [edit]
#include "proxy_voyager_impl.h"
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/ascii.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "grpc/status.h"
#include "grpcpp/impl/service_type.h"
#include "grpcpp/security/auth_context.h"
#include "grpcpp/server.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/status.h"
#include "grpcpp/support/sync_stream.h"
#include "proxy.h"
#include "proxy_config.pb.h"
#include "redfish_plugin.h"
#include "request_response.h"
#include "sse/sse_parser.h"
#include "google/protobuf/map.h"
#include "google/protobuf/message.h"
#include "voyager/priority_queue.hpp"
#include "voyager/voyager_telemetry.pb.h"
#include "voyager/fqp.h"
namespace milotic {
using ::third_party_voyager::DataPoint;
using ::third_party_voyager::Fqp;
using ::third_party_voyager::RequestFqp;
using ::third_party_voyager::Update;
constexpr absl::string_view kReqIdHeader = "vbmc-internal:ReqID";
constexpr int kLogThrottleFrequencySec = 10;
static absl::StatusOr<std::unique_ptr<ProxyRequest>> CreateRequestHelper(
const Proxy &proxy, RedfishPlugin::RequestVerb verb,
const RequestFqp &req_fqp,
const google::protobuf::Map<std::string, std::string> &http_headers,
const grpc::AuthContext &auth_context) {
absl::StatusOr<std::string> path = FqpToPathStr(req_fqp.fqp());
if (!path.ok()) {
return path.status();
}
absl::StatusOr<std::unique_ptr<ProxyRequest>> status_or_http_request =
proxy.CreateAuthorizedRequest(verb, *path, auth_context);
if (!status_or_http_request.ok()) return status_or_http_request;
std::unique_ptr<ProxyRequest> http_request =
*std::move(status_or_http_request);
/* Copy http request headers to request */
for (const auto &[key, value] : http_headers) {
// Never override "Host" header.
if (absl::EqualsIgnoreCase(key, "host")) continue;
http_request->AddHeader(key, value);
}
if (req_fqp.export_interval_ns() > 0) {
http_request->AddHeader(
kExportIntervalHeaderKey,
absl::FormatDuration(absl::Nanoseconds(req_fqp.export_interval_ns())));
}
return http_request;
}
// Create Voyager Update from Http response.
static void SetUpdateFromHTTP(Update *update, const ProxyResponse &response,
const Fqp &fqp) {
/* Copy response headers back */
for (const auto &[key, value] : response.GetHeaders()) {
// HTTP headers names are not case sensitive. Changing consistently to
// lowercase allows for easier comparisons, and is consistent with HTTP2
// headers.
std::string key_lower = key;
absl::AsciiStrToLower(&key_lower);
if (value.empty()) continue;
(*update->mutable_http_headers())[std::move(key_lower)] = value;
}
if (!response.GetBody().empty()) {
DataPoint *data_point = update->add_data_points();
*data_point->mutable_res_fqp() = fqp;
data_point->set_json(response.GetBody());
}
update->set_code(response.GetCode());
}
static Proxy::RequestPriority MapRequestPriority(
third_party_voyager::FqpPriority priority) {
switch (priority) {
case third_party_voyager::FQP_PRI_LOW:
return Proxy::RequestPriority::kLow;
case third_party_voyager::FQP_PRI_MED:
return Proxy::RequestPriority::kHigh;
case third_party_voyager::FQP_PRI_HIG:
return Proxy::RequestPriority::kCrit;
// Default to low priority
case third_party_voyager::FQP_PRI_UNSPECIFIED:
return Proxy::RequestPriority::kLow;
default:
LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec)
<< "Invalid priority " << priority;
return Proxy::RequestPriority::kLow;
}
}
grpc::Status ProxyVoyagerImpl::DispatchRequest(
RedfishPlugin::RequestVerb verb,
const third_party_voyager::Request *request,
third_party_voyager::Update *response,
const grpc::AuthContext &auth_context) {
response->set_req_id(request->req_id());
if (request->req_fqp().empty()) {
return grpc::Status::OK;
}
if (request->req_fqp().size() > 1) {
LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec)
<< proxy_.name() << " - Ignoring multiple requests";
}
const RequestFqp &req_fqp = request->req_fqp()[0];
if (req_fqp.encoding() !=
third_party_voyager::Encoding::ENCODING_UNSPECIFIED &&
req_fqp.encoding() != third_party_voyager::Encoding::ENCODING_JSON) {
// TODO: b/322826898 - Support non-json payloads
return grpc::Status(
static_cast<grpc::StatusCode>(GRPC_STATUS_UNIMPLEMENTED),
"Unsupported encoding");
}
absl::StatusOr<std::unique_ptr<ProxyRequest>> http_request =
CreateRequestHelper(proxy_, verb, req_fqp, request->http_headers(),
auth_context);
if (!http_request.ok()) return ConvertStatus(http_request.status());
http_request.value()->AddHeader(kReqIdHeader, request->req_id());
absl::StatusOr<std::unique_ptr<Proxy::RequestJob>> job =
proxy_.DispatchRequestToQueue(verb, std::move(*http_request),
MapRequestPriority(req_fqp.priority()));
if (!job.ok()) return ConvertStatus(job.status());
if (job.value()->Wait() != voyager::Job::JobState::kDone) {
return grpc::Status(static_cast<grpc::StatusCode>(GRPC_STATUS_INTERNAL),
std::string("Job error"));
}
absl::StatusOr<ProxyResponse> &result = job.value()->response();
if (!result.ok()) return ConvertStatus(result.status());
SetUpdateFromHTTP(response, *result, req_fqp.fqp());
return grpc::Status::OK;
}
grpc::Status ProxyVoyagerImpl::DispatchRequest(
RedfishPlugin::RequestVerb verb,
const third_party_voyager::SetRequest *request,
third_party_voyager::Update *response,
const grpc::AuthContext &auth_context) {
response->set_req_id(request->req_id());
if (request->req_fqp().empty()) {
return grpc::Status::OK;
}
if (request->req_fqp().size() > 1) {
LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec)
<< proxy_.name() << " - Ignoring multiple requests";
}
const RequestFqp &req_fqp = request->req_fqp()[0];
absl::StatusOr<std::unique_ptr<ProxyRequest>> http_request =
CreateRequestHelper(proxy_, verb, req_fqp, request->http_headers(),
auth_context);
if (!http_request.ok()) return ConvertStatus(http_request.status());
http_request.value()->AddHeader(kReqIdHeader, request->req_id());
if (request->has_json()) {
http_request.value()->SetBody(request->json());
// This is declared inside the if block to ensure empty Content-Type header
// is not added to http_request.
if (http_request.value()->GetHeader("Content-Type").empty()) {
http_request.value()->AddHeader("Content-Type", "application/json");
}
} else if (request->has_key_value()) {
LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec)
<< proxy_.name()
<< " - Unsupported request type key-value for fqp: " << req_fqp.fqp();
return grpc::Status(
static_cast<grpc::StatusCode>(GRPC_STATUS_UNIMPLEMENTED),
"Unsupported request type");
} else if (request->has_native()) {
// TODO: b/307607427 - Pass native data to plugins w/o http request
http_request.value()->SetBody(request->native().SerializeAsString());
}
absl::StatusOr<std::unique_ptr<Proxy::RequestJob>> job =
proxy_.DispatchRequestToQueue(verb, std::move(*http_request),
MapRequestPriority(req_fqp.priority()));
if (!job.ok()) return ConvertStatus(job.status());
if (job.value()->Wait() != voyager::Job::JobState::kDone) {
return grpc::Status(static_cast<grpc::StatusCode>(GRPC_STATUS_INTERNAL),
"Job error");
}
absl::StatusOr<ProxyResponse> &result = job.value()->response();
if (!result.ok()) return ConvertStatus(result.status());
SetUpdateFromHTTP(response, *result, req_fqp.fqp());
return grpc::Status::OK;
}
grpc::Status ProxyVoyagerImpl::Subscribe(
grpc::ServerContext *context, const third_party_voyager::Request *request,
grpc::ServerWriter<third_party_voyager::Update> *writer) {
if (request->req_fqp().empty()) {
return grpc::Status::OK;
}
if (request->req_fqp().size() > 1) {
LOG(WARNING) << proxy_.name() << " - Ignoring multiple requests";
}
const RequestFqp &req_fqp = request->req_fqp()[0];
absl::StatusOr<std::unique_ptr<ProxyRequest>> http_request =
CreateRequestHelper(proxy_, RedfishPlugin::RequestVerb::kSubscribe,
req_fqp, request->http_headers(),
*context->auth_context());
if (!http_request.ok()) return ConvertStatus(http_request.status());
http_request.value()->AddHeader(kReqIdHeader, request->req_id());
class EventHandler : public RedfishPlugin::EventHandler {
public:
EventHandler(grpc::ServerContext *context,
const third_party_voyager::Request *request,
grpc::ServerWriter<third_party_voyager::Update> *writer)
: context_(context), writer_(writer), req_fqp_(request->req_fqp()[0]) {
update_.set_req_id(request->req_id());
}
absl::Status OnResponse(const ProxyResponse &http_response) override {
writer_->SendInitialMetadata();
if (context_->IsCancelled()) {
return absl::CancelledError();
}
SetUpdateFromHTTP(&update_, http_response, req_fqp_.fqp());
return absl::OkStatus();
}
// Raw SSE events are packed into the key_value field using the event field
// names as keys.
absl::Status OnEvent(const ServerSentEvent &event) override {
update_.clear_data_points();
third_party_voyager::DataPoint *data_point = update_.add_data_points();
*data_point->mutable_res_fqp() = req_fqp_.fqp();
auto &fields = *data_point->mutable_key_value()->mutable_fields();
if (event.event) {
fields["event"].set_string_val(*event.event);
}
if (event.data) {
fields["data"].set_string_val(*event.data);
}
if (event.id) {
fields["id"].set_string_val(*event.id);
}
if (event.retry) {
fields["retry"].set_uint_val(static_cast<uint64_t>(*event.retry));
}
if (!writer_->Write(update_)) {
return absl::CancelledError();
}
return absl::OkStatus();
}
// Plugins may instead generate Update messages directly, in which case they
// are merged with the default Update which includes the result code and
// headers from the initial response.
absl::Status OnEvent(const google::protobuf::Message &event_message) override {
if (event_message.GetTypeName() != update_.GetTypeName()) {
return absl::InvalidArgumentError(absl::StrCat(
"Unsupported event message: ", event_message.GetTypeName()));
}
update_.clear_data_points();
update_.MergeFrom(event_message);
for (auto &data_point : *update_.mutable_data_points()) {
*data_point.mutable_res_fqp() = req_fqp_.fqp();
}
if (!writer_->Write(update_)) {
return absl::CancelledError();
}
return absl::OkStatus();
}
bool IsCancelled() const override { return context_->IsCancelled(); }
private:
grpc::ServerContext *context_;
grpc::ServerWriter<third_party_voyager::Update> *writer_;
const RequestFqp &req_fqp_;
third_party_voyager::Update update_;
} handler(context, request, writer);
absl::Status result = proxy_.Subscribe(std::move(*http_request), &handler);
return ConvertStatus(result);
}
static Proxy::RegisterService<milotic_grpc_proxy::VoyagerTelemetryOptions,
ProxyVoyagerImpl>
voyager_service_reg;
} // namespace milotic