| #include "redfish_passthrough_plugin.h" |
| |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <tuple> |
| #include <utility> |
| |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/match.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/time/time.h" |
| #include "redfish_query_engine/http/client.h" |
| #include "redfish_query_engine/cred.pb.h" |
| #include "redfish_query_engine/http/curl_client.h" |
| #include "metrics.h" |
| #include "monitoring.h" |
| #include "proxy_builder.h" |
| #include "proxy_config.pb.h" |
| #include "redfish_plugin.h" |
| #include "request_response.h" |
| #include "sse/sse_parser.h" |
| |
| namespace milotic { |
| |
| const uint64_t kRequestTimeoutMSec = 30000; |
| const uint64_t kConnectTimeoutMSec = 5000; |
| const int64_t kDnsTimeoutSec = 60; |
| const int64_t kMaxRecvSpeed = -1; |
| const uint32_t kMaxConcurrentRequests = 16; |
| const int64_t kAcquireClientTimeoutSec = 10; |
| |
| // Only intended for use with numeric types whose default value is 0 |
| // Returns override_value if config_value is 0 |
| template <typename T> |
| T GetConfigValue(T config_value, T override_value) { |
| if (config_value == 0) return override_value; |
| return config_value; |
| } |
| |
| // Case insensitive search for header |
| static std::optional<absl::string_view> GetHeader( |
| const ecclesia::HttpClient::HttpHeaders& headers, absl::string_view name) { |
| for (const auto& [key, value] : headers) { |
| if (absl::EqualsIgnoreCase(key, name)) { |
| return value; |
| } |
| } |
| return std::nullopt; |
| } |
| |
| RedfishPassthroughPlugin::RedfishPassthroughPlugin( |
| const milotic_grpc_proxy::Plugin::RedfishPassthrough& config) |
| : RedfishPassthroughPluginBase(&curl_http_client_, |
| &subscribe_curl_http_client_), |
| curl_http_client_( |
| [&config]() -> std::unique_ptr<ecclesia::HttpClient> { |
| return std::make_unique<ecclesia::CurlHttpClient>( |
| ecclesia::LibCurlProxy::CreateInstance(), |
| ecclesia::HttpCredential(), |
| ecclesia::CurlHttpClient::Config{ |
| .raw = false, |
| .request_timeout_msec = GetConfigValue( |
| config.request_timeout_msec(), kRequestTimeoutMSec), |
| .connect_timeout_msec = GetConfigValue( |
| config.connect_timeout_msec(), kConnectTimeoutMSec), |
| .dns_timeout = static_cast<int>(GetConfigValue( |
| config.dns_timeout_sec(), kDnsTimeoutSec)), |
| .max_recv_speed = static_cast<int>( |
| GetConfigValue(config.max_recv_speed(), kMaxRecvSpeed)), |
| }); |
| }, |
| {.max_concurrent_requests = GetConfigValue( |
| config.max_concurrent_requests(), kMaxConcurrentRequests), |
| .acquire_client_timeout = |
| absl::Seconds(GetConfigValue(config.acquire_client_timeout_sec(), |
| kAcquireClientTimeoutSec))}), |
| subscribe_curl_http_client_( |
| [&config]() -> std::unique_ptr<ecclesia::HttpClient> { |
| return std::make_unique<ecclesia::CurlHttpClient>( |
| ecclesia::LibCurlProxy::CreateInstance(), |
| ecclesia::HttpCredential(), |
| ecclesia::CurlHttpClient::Config{ |
| .raw = false, |
| .request_timeout_msec = 0, |
| .connect_timeout_msec = GetConfigValue( |
| config.connect_timeout_msec(), kConnectTimeoutMSec), |
| .dns_timeout = static_cast<int>(GetConfigValue( |
| config.dns_timeout_sec(), kDnsTimeoutSec)), |
| .max_recv_speed = static_cast<int>( |
| GetConfigValue(config.max_recv_speed(), kMaxRecvSpeed)), |
| .low_speed_limit = |
| config.sse_low_speed_limit_bytes_per_sec(), |
| .low_speed_time = config.sse_low_speed_time_sec(), |
| }); |
| }, |
| {.max_concurrent_requests = GetConfigValue( |
| config.max_concurrent_requests(), kMaxConcurrentRequests), |
| .acquire_client_timeout = |
| absl::Seconds(GetConfigValue(config.acquire_client_timeout_sec(), |
| kAcquireClientTimeoutSec))}) {} |
| |
| absl::StatusOr<ProxyResponse> RedfishPassthroughPluginBase ::HandleRequest( |
| RedfishPlugin::RequestVerb verb, |
| std::unique_ptr<ProxyRequest> http_request) { |
| // Making a copy to pass to latency monitor as request gets moved. |
| const std::string request_path = std::string(http_request->GetPath()); |
| LatencyMonitor latency_monitor( |
| &CommonMetrics::Get().passthrough_request_latency, |
| {VerbToString(verb), request_path}); |
| absl::StatusOr<ecclesia::HttpClient::HttpResponse> http_response; |
| auto http_client_request = http_request->ToHttpClientRequest(); |
| switch (verb) { |
| case RedfishPlugin::RequestVerb::kPost: |
| http_response = http_client_.Post(std::move(http_client_request)); |
| break; |
| case RedfishPlugin::RequestVerb::kGet: |
| http_response = http_client_.Get(std::move(http_client_request)); |
| break; |
| case RedfishPlugin::RequestVerb::kPatch: |
| http_response = http_client_.Patch(std::move(http_client_request)); |
| break; |
| case RedfishPlugin::RequestVerb::kDelete: |
| http_response = http_client_.Delete(std::move(http_client_request)); |
| break; |
| default: |
| return absl::UnimplementedError("HTTP method not implemented"); |
| } |
| if (!http_response.ok()) { |
| return CaptureResponseCode(http_response.status(), |
| &CommonMetrics::Get().passthrough_response_code, |
| request_path, /*field_values=*/{}); |
| ; |
| } |
| return CaptureResponseCode(ProxyResponse(*http_response), |
| &CommonMetrics::Get().passthrough_response_code, |
| request_path, /*field_values=*/{}); |
| } |
| |
| absl::Status RedfishPassthroughPluginBase ::Subscribe( |
| std::unique_ptr<ProxyRequest> request, EventHandler* handler) { |
| class IncrementalResponseHandler |
| : public ecclesia::HttpClient::IncrementalResponseHandler { |
| public: |
| IncrementalResponseHandler(EventHandler* handler, std::string path) |
| : handler_(handler), |
| path_(std::move(path)), |
| latency_monitor_( |
| std::in_place, &CommonMetrics::Get().passthrough_request_latency, |
| std::make_tuple( |
| VerbToString(RedfishPlugin::RequestVerb::kSubscribe), |
| std::string_view(path_))) {} |
| absl::Status OnResponseHeaders( |
| const ecclesia::HttpClient::HttpResponse& response) override { |
| latency_monitor_.reset(); |
| absl::Status status = handler_->OnResponse(ProxyResponse(response)); |
| if (!status.ok()) return status; |
| std::optional<absl::string_view> content_type = |
| GetHeader(response.headers, "Content-type"); |
| if (!content_type || |
| !absl::StartsWithIgnoreCase(*content_type, "text/event-stream")) { |
| return absl::InvalidArgumentError("Not an event stream"); |
| } |
| return absl::OkStatus(); |
| } |
| absl::Status OnBodyData(absl::string_view data) override { |
| return sse_parser_.ParseData(data, [this](const ServerSentEvent& event) { |
| CommonMetrics::Get().sse_event_counter.Increment(path_); |
| return handler_->OnEvent(event); |
| }); |
| } |
| |
| bool IsCancelled() const override { return handler_->IsCancelled(); } |
| |
| private: |
| EventHandler* handler_; |
| SseParser sse_parser_; |
| std::string path_; |
| std::optional<LatencyMonitor<absl::string_view, absl::string_view>> |
| latency_monitor_; |
| } request_handler(handler, std::string(request->GetPath())); |
| |
| return subscribe_http_client_.GetIncremental(request->ToHttpClientRequest(), |
| &request_handler); |
| } |
| |
| absl::Status RedfishPassthroughPluginBase ::Initialize(Proxy* proxy) { |
| return absl::OkStatus(); |
| } |
| |
| REGISTER_REDFISH_PLUGIN(redfish_passthrough, RedfishPassthroughPlugin); |
| |
| } // namespace milotic |