| |
| #include "proxy_voyager_impl.h" |
| |
| #include <cstdint> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/ascii.h" |
| #include "absl/strings/match.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/time/time.h" |
| #include "grpc/status.h" |
| #include "grpcpp/impl/service_type.h" |
| #include "grpcpp/security/auth_context.h" |
| #include "grpcpp/server.h" |
| #include "grpcpp/server_context.h" |
| #include "grpcpp/support/status.h" |
| #include "grpcpp/support/sync_stream.h" |
| #include "proxy.h" |
| #include "proxy_config.pb.h" |
| #include "redfish_plugin.h" |
| #include "request_response.h" |
| #include "sse/sse_parser.h" |
| #include "google/protobuf/map.h" |
| #include "google/protobuf/message.h" |
| #include "voyager/priority_queue.hpp" |
| #include "voyager/voyager_telemetry.pb.h" |
| #include "voyager/fqp.h" |
| |
| namespace milotic { |
| |
| using ::third_party_voyager::DataPoint; |
| using ::third_party_voyager::Fqp; |
| using ::third_party_voyager::RequestFqp; |
| using ::third_party_voyager::Update; |
| |
| constexpr absl::string_view kReqIdHeader = "vbmc-internal:ReqID"; |
| constexpr int kLogThrottleFrequencySec = 10; |
| |
| static absl::StatusOr<std::unique_ptr<ProxyRequest>> CreateRequestHelper( |
| const Proxy &proxy, RedfishPlugin::RequestVerb verb, |
| const RequestFqp &req_fqp, |
| const google::protobuf::Map<std::string, std::string> &http_headers, |
| const grpc::AuthContext &auth_context) { |
| absl::StatusOr<std::string> path = FqpToPathStr(req_fqp.fqp()); |
| if (!path.ok()) { |
| return path.status(); |
| } |
| absl::StatusOr<std::unique_ptr<ProxyRequest>> status_or_http_request = |
| proxy.CreateAuthorizedRequest(verb, *path, auth_context); |
| if (!status_or_http_request.ok()) return status_or_http_request; |
| std::unique_ptr<ProxyRequest> http_request = |
| *std::move(status_or_http_request); |
| /* Copy http request headers to request */ |
| for (const auto &[key, value] : http_headers) { |
| // Never override "Host" header. |
| if (absl::EqualsIgnoreCase(key, "host")) continue; |
| http_request->AddHeader(key, value); |
| } |
| if (req_fqp.export_interval_ns() > 0) { |
| http_request->AddHeader( |
| kExportIntervalHeaderKey, |
| absl::FormatDuration(absl::Nanoseconds(req_fqp.export_interval_ns()))); |
| } |
| return http_request; |
| } |
| |
| // Create Voyager Update from Http response. |
| static void SetUpdateFromHTTP(Update *update, const ProxyResponse &response, |
| const Fqp &fqp) { |
| /* Copy response headers back */ |
| for (const auto &[key, value] : response.GetHeaders()) { |
| // HTTP headers names are not case sensitive. Changing consistently to |
| // lowercase allows for easier comparisons, and is consistent with HTTP2 |
| // headers. |
| std::string key_lower = key; |
| absl::AsciiStrToLower(&key_lower); |
| if (value.empty()) continue; |
| (*update->mutable_http_headers())[std::move(key_lower)] = value; |
| } |
| |
| if (!response.GetBody().empty()) { |
| DataPoint *data_point = update->add_data_points(); |
| *data_point->mutable_res_fqp() = fqp; |
| data_point->set_json(response.GetBody()); |
| } |
| update->set_code(response.GetCode()); |
| } |
| |
| static Proxy::RequestPriority MapRequestPriority( |
| third_party_voyager::FqpPriority priority) { |
| switch (priority) { |
| case third_party_voyager::FQP_PRI_LOW: |
| return Proxy::RequestPriority::kLow; |
| case third_party_voyager::FQP_PRI_MED: |
| return Proxy::RequestPriority::kHigh; |
| case third_party_voyager::FQP_PRI_HIG: |
| return Proxy::RequestPriority::kCrit; |
| // Default to low priority |
| case third_party_voyager::FQP_PRI_UNSPECIFIED: |
| return Proxy::RequestPriority::kLow; |
| default: |
| LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec) |
| << "Invalid priority " << priority; |
| return Proxy::RequestPriority::kLow; |
| } |
| } |
| |
| grpc::Status ProxyVoyagerImpl::DispatchRequest( |
| RedfishPlugin::RequestVerb verb, |
| const third_party_voyager::Request *request, |
| third_party_voyager::Update *response, |
| const grpc::AuthContext &auth_context) { |
| response->set_req_id(request->req_id()); |
| if (request->req_fqp().empty()) { |
| return grpc::Status::OK; |
| } |
| if (request->req_fqp().size() > 1) { |
| LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec) |
| << proxy_.name() << " - Ignoring multiple requests"; |
| } |
| const RequestFqp &req_fqp = request->req_fqp()[0]; |
| if (req_fqp.encoding() != |
| third_party_voyager::Encoding::ENCODING_UNSPECIFIED && |
| req_fqp.encoding() != third_party_voyager::Encoding::ENCODING_JSON) { |
| // TODO: b/322826898 - Support non-json payloads |
| return grpc::Status( |
| static_cast<grpc::StatusCode>(GRPC_STATUS_UNIMPLEMENTED), |
| "Unsupported encoding"); |
| } |
| |
| absl::StatusOr<std::unique_ptr<ProxyRequest>> http_request = |
| CreateRequestHelper(proxy_, verb, req_fqp, request->http_headers(), |
| auth_context); |
| if (!http_request.ok()) return ConvertStatus(http_request.status()); |
| |
| http_request.value()->AddHeader(kReqIdHeader, request->req_id()); |
| |
| absl::StatusOr<std::unique_ptr<Proxy::RequestJob>> job = |
| proxy_.DispatchRequestToQueue(verb, std::move(*http_request), |
| MapRequestPriority(req_fqp.priority())); |
| |
| if (!job.ok()) return ConvertStatus(job.status()); |
| |
| if (job.value()->Wait() != voyager::Job::JobState::kDone) { |
| return grpc::Status(static_cast<grpc::StatusCode>(GRPC_STATUS_INTERNAL), |
| std::string("Job error")); |
| } |
| absl::StatusOr<ProxyResponse> &result = job.value()->response(); |
| |
| if (!result.ok()) return ConvertStatus(result.status()); |
| |
| SetUpdateFromHTTP(response, *result, req_fqp.fqp()); |
| return grpc::Status::OK; |
| } |
| |
| grpc::Status ProxyVoyagerImpl::DispatchRequest( |
| RedfishPlugin::RequestVerb verb, |
| const third_party_voyager::SetRequest *request, |
| third_party_voyager::Update *response, |
| const grpc::AuthContext &auth_context) { |
| response->set_req_id(request->req_id()); |
| if (request->req_fqp().empty()) { |
| return grpc::Status::OK; |
| } |
| if (request->req_fqp().size() > 1) { |
| LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec) |
| << proxy_.name() << " - Ignoring multiple requests"; |
| } |
| const RequestFqp &req_fqp = request->req_fqp()[0]; |
| absl::StatusOr<std::unique_ptr<ProxyRequest>> http_request = |
| CreateRequestHelper(proxy_, verb, req_fqp, request->http_headers(), |
| auth_context); |
| if (!http_request.ok()) return ConvertStatus(http_request.status()); |
| |
| http_request.value()->AddHeader(kReqIdHeader, request->req_id()); |
| |
| if (request->has_json()) { |
| http_request.value()->SetBody(request->json()); |
| // This is declared inside the if block to ensure empty Content-Type header |
| // is not added to http_request. |
| if (http_request.value()->GetHeader("Content-Type").empty()) { |
| http_request.value()->AddHeader("Content-Type", "application/json"); |
| } |
| } else if (request->has_key_value()) { |
| LOG_EVERY_N_SEC(WARNING, kLogThrottleFrequencySec) |
| << proxy_.name() |
| << " - Unsupported request type key-value for fqp: " << req_fqp.fqp(); |
| return grpc::Status( |
| static_cast<grpc::StatusCode>(GRPC_STATUS_UNIMPLEMENTED), |
| "Unsupported request type"); |
| } else if (request->has_native()) { |
| // TODO: b/307607427 - Pass native data to plugins w/o http request |
| http_request.value()->SetBody(request->native().SerializeAsString()); |
| } |
| |
| absl::StatusOr<std::unique_ptr<Proxy::RequestJob>> job = |
| proxy_.DispatchRequestToQueue(verb, std::move(*http_request), |
| MapRequestPriority(req_fqp.priority())); |
| if (!job.ok()) return ConvertStatus(job.status()); |
| |
| if (job.value()->Wait() != voyager::Job::JobState::kDone) { |
| return grpc::Status(static_cast<grpc::StatusCode>(GRPC_STATUS_INTERNAL), |
| "Job error"); |
| } |
| |
| absl::StatusOr<ProxyResponse> &result = job.value()->response(); |
| if (!result.ok()) return ConvertStatus(result.status()); |
| |
| SetUpdateFromHTTP(response, *result, req_fqp.fqp()); |
| return grpc::Status::OK; |
| } |
| |
| grpc::Status ProxyVoyagerImpl::Subscribe( |
| grpc::ServerContext *context, const third_party_voyager::Request *request, |
| grpc::ServerWriter<third_party_voyager::Update> *writer) { |
| if (request->req_fqp().empty()) { |
| return grpc::Status::OK; |
| } |
| if (request->req_fqp().size() > 1) { |
| LOG(WARNING) << proxy_.name() << " - Ignoring multiple requests"; |
| } |
| const RequestFqp &req_fqp = request->req_fqp()[0]; |
| |
| absl::StatusOr<std::unique_ptr<ProxyRequest>> http_request = |
| CreateRequestHelper(proxy_, RedfishPlugin::RequestVerb::kSubscribe, |
| req_fqp, request->http_headers(), |
| *context->auth_context()); |
| if (!http_request.ok()) return ConvertStatus(http_request.status()); |
| |
| http_request.value()->AddHeader(kReqIdHeader, request->req_id()); |
| |
| class EventHandler : public RedfishPlugin::EventHandler { |
| public: |
| EventHandler(grpc::ServerContext *context, |
| const third_party_voyager::Request *request, |
| grpc::ServerWriter<third_party_voyager::Update> *writer) |
| : context_(context), writer_(writer), req_fqp_(request->req_fqp()[0]) { |
| update_.set_req_id(request->req_id()); |
| } |
| absl::Status OnResponse(const ProxyResponse &http_response) override { |
| writer_->SendInitialMetadata(); |
| if (context_->IsCancelled()) { |
| return absl::CancelledError(); |
| } |
| SetUpdateFromHTTP(&update_, http_response, req_fqp_.fqp()); |
| return absl::OkStatus(); |
| } |
| // Raw SSE events are packed into the key_value field using the event field |
| // names as keys. |
| absl::Status OnEvent(const ServerSentEvent &event) override { |
| update_.clear_data_points(); |
| third_party_voyager::DataPoint *data_point = update_.add_data_points(); |
| *data_point->mutable_res_fqp() = req_fqp_.fqp(); |
| auto &fields = *data_point->mutable_key_value()->mutable_fields(); |
| if (event.event) { |
| fields["event"].set_string_val(*event.event); |
| } |
| if (event.data) { |
| fields["data"].set_string_val(*event.data); |
| } |
| if (event.id) { |
| fields["id"].set_string_val(*event.id); |
| } |
| if (event.retry) { |
| fields["retry"].set_uint_val(static_cast<uint64_t>(*event.retry)); |
| } |
| if (!writer_->Write(update_)) { |
| return absl::CancelledError(); |
| } |
| return absl::OkStatus(); |
| } |
| // Plugins may instead generate Update messages directly, in which case they |
| // are merged with the default Update which includes the result code and |
| // headers from the initial response. |
| absl::Status OnEvent(const google::protobuf::Message &event_message) override { |
| if (event_message.GetTypeName() != update_.GetTypeName()) { |
| return absl::InvalidArgumentError(absl::StrCat( |
| "Unsupported event message: ", event_message.GetTypeName())); |
| } |
| update_.clear_data_points(); |
| update_.MergeFrom(event_message); |
| for (auto &data_point : *update_.mutable_data_points()) { |
| *data_point.mutable_res_fqp() = req_fqp_.fqp(); |
| } |
| if (!writer_->Write(update_)) { |
| return absl::CancelledError(); |
| } |
| return absl::OkStatus(); |
| } |
| |
| bool IsCancelled() const override { return context_->IsCancelled(); } |
| |
| private: |
| grpc::ServerContext *context_; |
| grpc::ServerWriter<third_party_voyager::Update> *writer_; |
| const RequestFqp &req_fqp_; |
| third_party_voyager::Update update_; |
| } handler(context, request, writer); |
| absl::Status result = proxy_.Subscribe(std::move(*http_request), &handler); |
| return ConvertStatus(result); |
| } |
| |
| static Proxy::RegisterService<milotic_grpc_proxy::VoyagerTelemetryOptions, |
| ProxyVoyagerImpl> |
| voyager_service_reg; |
| |
| } // namespace milotic |