blob: a2ad4bc2dc3177979a9608cb5cd071aeb0166bb9 [file] [log] [blame] [edit]
#include "redfish_passthrough_plugin.h"
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <tuple>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "redfish_query_engine/http/client.h"
#include "redfish_query_engine/cred.pb.h"
#include "redfish_query_engine/http/curl_client.h"
#include "metrics.h"
#include "monitoring.h"
#include "proxy_builder.h"
#include "proxy_config.pb.h"
#include "redfish_plugin.h"
#include "request_response.h"
#include "sse/sse_parser.h"
namespace milotic {
const uint64_t kRequestTimeoutMSec = 30000;
const uint64_t kConnectTimeoutMSec = 5000;
const int64_t kDnsTimeoutSec = 60;
const int64_t kMaxRecvSpeed = -1;
const uint32_t kMaxConcurrentRequests = 16;
const int64_t kAcquireClientTimeoutSec = 10;
// Only intended for use with numeric types whose default value is 0
// Returns override_value if config_value is 0
template <typename T>
T GetConfigValue(T config_value, T override_value) {
if (config_value == 0) return override_value;
return config_value;
}
// Case insensitive search for header
static std::optional<absl::string_view> GetHeader(
const ecclesia::HttpClient::HttpHeaders& headers, absl::string_view name) {
for (const auto& [key, value] : headers) {
if (absl::EqualsIgnoreCase(key, name)) {
return value;
}
}
return std::nullopt;
}
RedfishPassthroughPlugin::RedfishPassthroughPlugin(
const milotic_grpc_proxy::Plugin::RedfishPassthrough& config)
: RedfishPassthroughPluginBase(&curl_http_client_,
&subscribe_curl_http_client_),
curl_http_client_(
[&config]() -> std::unique_ptr<ecclesia::HttpClient> {
return std::make_unique<ecclesia::CurlHttpClient>(
ecclesia::LibCurlProxy::CreateInstance(),
ecclesia::HttpCredential(),
ecclesia::CurlHttpClient::Config{
.raw = false,
.request_timeout_msec = GetConfigValue(
config.request_timeout_msec(), kRequestTimeoutMSec),
.connect_timeout_msec = GetConfigValue(
config.connect_timeout_msec(), kConnectTimeoutMSec),
.dns_timeout = static_cast<int>(GetConfigValue(
config.dns_timeout_sec(), kDnsTimeoutSec)),
.max_recv_speed = static_cast<int>(
GetConfigValue(config.max_recv_speed(), kMaxRecvSpeed)),
});
},
{.max_concurrent_requests = GetConfigValue(
config.max_concurrent_requests(), kMaxConcurrentRequests),
.acquire_client_timeout =
absl::Seconds(GetConfigValue(config.acquire_client_timeout_sec(),
kAcquireClientTimeoutSec))}),
subscribe_curl_http_client_(
[&config]() -> std::unique_ptr<ecclesia::HttpClient> {
return std::make_unique<ecclesia::CurlHttpClient>(
ecclesia::LibCurlProxy::CreateInstance(),
ecclesia::HttpCredential(),
ecclesia::CurlHttpClient::Config{
.raw = false,
.request_timeout_msec = 0,
.connect_timeout_msec = GetConfigValue(
config.connect_timeout_msec(), kConnectTimeoutMSec),
.dns_timeout = static_cast<int>(GetConfigValue(
config.dns_timeout_sec(), kDnsTimeoutSec)),
.max_recv_speed = static_cast<int>(
GetConfigValue(config.max_recv_speed(), kMaxRecvSpeed)),
.low_speed_limit =
config.sse_low_speed_limit_bytes_per_sec(),
.low_speed_time = config.sse_low_speed_time_sec(),
});
},
{.max_concurrent_requests = GetConfigValue(
config.max_concurrent_requests(), kMaxConcurrentRequests),
.acquire_client_timeout =
absl::Seconds(GetConfigValue(config.acquire_client_timeout_sec(),
kAcquireClientTimeoutSec))}) {}
absl::StatusOr<ProxyResponse> RedfishPassthroughPluginBase ::HandleRequest(
RedfishPlugin::RequestVerb verb,
std::unique_ptr<ProxyRequest> http_request) {
// Making a copy to pass to latency monitor as request gets moved.
const std::string request_path = std::string(http_request->GetPath());
LatencyMonitor latency_monitor(
&CommonMetrics::Get().passthrough_request_latency,
{VerbToString(verb), request_path});
absl::StatusOr<ecclesia::HttpClient::HttpResponse> http_response;
auto http_client_request = http_request->ToHttpClientRequest();
switch (verb) {
case RedfishPlugin::RequestVerb::kPost:
http_response = http_client_.Post(std::move(http_client_request));
break;
case RedfishPlugin::RequestVerb::kGet:
http_response = http_client_.Get(std::move(http_client_request));
break;
case RedfishPlugin::RequestVerb::kPatch:
http_response = http_client_.Patch(std::move(http_client_request));
break;
case RedfishPlugin::RequestVerb::kDelete:
http_response = http_client_.Delete(std::move(http_client_request));
break;
default:
return absl::UnimplementedError("HTTP method not implemented");
}
if (!http_response.ok()) {
return CaptureResponseCode(http_response.status(),
&CommonMetrics::Get().passthrough_response_code,
request_path, /*field_values=*/{});
;
}
return CaptureResponseCode(ProxyResponse(*http_response),
&CommonMetrics::Get().passthrough_response_code,
request_path, /*field_values=*/{});
}
absl::Status RedfishPassthroughPluginBase ::Subscribe(
std::unique_ptr<ProxyRequest> request, EventHandler* handler) {
class IncrementalResponseHandler
: public ecclesia::HttpClient::IncrementalResponseHandler {
public:
IncrementalResponseHandler(EventHandler* handler, std::string path)
: handler_(handler),
path_(std::move(path)),
latency_monitor_(
std::in_place, &CommonMetrics::Get().passthrough_request_latency,
std::make_tuple(
VerbToString(RedfishPlugin::RequestVerb::kSubscribe),
std::string_view(path_))) {}
absl::Status OnResponseHeaders(
const ecclesia::HttpClient::HttpResponse& response) override {
latency_monitor_.reset();
absl::Status status = handler_->OnResponse(ProxyResponse(response));
if (!status.ok()) return status;
std::optional<absl::string_view> content_type =
GetHeader(response.headers, "Content-type");
if (!content_type ||
!absl::StartsWithIgnoreCase(*content_type, "text/event-stream")) {
return absl::InvalidArgumentError("Not an event stream");
}
return absl::OkStatus();
}
absl::Status OnBodyData(absl::string_view data) override {
return sse_parser_.ParseData(data, [this](const ServerSentEvent& event) {
CommonMetrics::Get().sse_event_counter.Increment(path_);
return handler_->OnEvent(event);
});
}
bool IsCancelled() const override { return handler_->IsCancelled(); }
private:
EventHandler* handler_;
SseParser sse_parser_;
std::string path_;
std::optional<LatencyMonitor<absl::string_view, absl::string_view>>
latency_monitor_;
} request_handler(handler, std::string(request->GetPath()));
return subscribe_http_client_.GetIncremental(request->ToHttpClientRequest(),
&request_handler);
}
absl::Status RedfishPassthroughPluginBase ::Initialize(Proxy* proxy) {
return absl::OkStatus();
}
REGISTER_REDFISH_PLUGIN(redfish_passthrough, RedfishPassthroughPlugin);
} // namespace milotic