blob: 6ade922e218ec9dd9d5bf5e4cee273534b018c8c [file] [log] [blame]
#include "proxy_builder.h"
#include <fcntl.h>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "grpcpp/security/authorization_policy_provider.h"
#include "proxy.h"
#include "proxy_config.pb.h"
#include "redfish_plugin.h"
#include "resource_authz.h"
#include "ssh_client.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/text_format.h"
namespace milotic {
constexpr size_t kDefaultQueueSize = 4;
constexpr int kDefaultQuantum = 1;
constexpr uint16_t kDefaultSshPort = 22;
namespace internal {
absl::StatusOr<milotic_grpc_proxy::Configuration> LoadConfig(
const std::string& config_file_path) {
using google::protobuf::TextFormat;
using google::protobuf::io::FileInputStream;
int fd = open(config_file_path.c_str(), O_RDONLY);
if (fd < 0) {
return absl::NotFoundError(
absl::StrCat("Failed to open config file ", config_file_path));
}
FileInputStream config_file(fd);
// FileInputStream takes ownership of fd
config_file.SetCloseOnDelete(true);
milotic_grpc_proxy::Configuration config;
if (!TextFormat::Parse(&config_file, &config)) {
return absl::InvalidArgumentError(
absl::StrCat("Failed to parse config file ", config_file_path));
}
return config;
}
} // namespace internal
std::unique_ptr<RedfishPlugin> PluginFactory::CreatePlugin(
const milotic_grpc_proxy::Plugin& plugin_config) {
std::unique_ptr<RedfishPlugin> plugin;
for (const auto& fn : GetFunctionList()) {
plugin = fn(plugin_config);
if (plugin != nullptr) {
break;
}
}
return plugin;
}
absl::StatusOr<ProxyBuilder> ProxyBuilder::CreateAndStart(
const std::string& config_file_path, const Options& options) {
absl::StatusOr<milotic_grpc_proxy::Configuration> config =
internal::LoadConfig(config_file_path);
if (!config.ok()) {
return config.status();
}
return CreateAndStart(*config, options);
}
constexpr absl::string_view SchemeToString(
milotic_grpc_proxy::TargetScheme scheme) {
switch (scheme) {
case milotic_grpc_proxy::TargetScheme::TARGET_SCHEME_HTTPS:
return "https://";
case milotic_grpc_proxy::TargetScheme::TARGET_SCHEME_HTTP:
return "http://";
case milotic_grpc_proxy::TargetScheme::TARGET_SCHEME_GRPC:
return "dns:///";
default:
return "";
}
}
absl::StatusOr<ProxyBuilder> ProxyBuilder::CreateAndStart(
const milotic_grpc_proxy::Configuration& config, const Options& options) {
if (config.proxy_configuration().empty()) {
return absl::InvalidArgumentError("No proxy configs.");
}
size_t queue_size = kDefaultQueueSize;
if (config.has_queue_options() && config.queue_options().size() > 0) {
queue_size = config.queue_options().size();
}
int quantum = kDefaultQuantum;
if (config.has_queue_options() && config.queue_options().quantum() > 0) {
if (config.queue_options().quantum() > queue_size) {
return absl::InvalidArgumentError(absl::StrCat(
"Invalid quantum number: ", config.queue_options().quantum()));
}
quantum = static_cast<int>(config.queue_options().quantum());
}
ProxyBuilder proxy_builder(queue_size, quantum,
options.executor_factory(config.queue_options()));
for (const milotic_grpc_proxy::ProxyConfiguration& proxy_config :
config.proxy_configuration()) {
absl::Status status = proxy_builder.AddProxy(proxy_config, options);
if (!status.ok()) {
LOG(ERROR) << "Failed to add proxy with config: " << proxy_config
<< " with status: " << status;
return status;
}
}
return proxy_builder;
}
absl::Status ProxyBuilder::AddProxy(
const milotic_grpc_proxy::ProxyConfiguration& proxy_config,
const Options& options) {
std::vector<std::unique_ptr<RedfishPlugin>> plugins;
for (const auto& plugin_conf : proxy_config.plugins()) {
std::unique_ptr<RedfishPlugin> plugin =
PluginFactory::CreatePlugin(plugin_conf);
if (plugin == nullptr) {
return absl::InvalidArgumentError(
absl::StrCat("Unsupported plugin: ", plugin_conf.name()));
}
plugin->SetName(plugin_conf.name());
plugins.push_back(std::move(plugin));
}
std::string target_host;
if (proxy_config.has_target()) {
target_host = absl::StrCat(SchemeToString(proxy_config.target().scheme()),
proxy_config.target().hostname());
if (proxy_config.target().port() != 0) {
if (proxy_config.target().port() > std::numeric_limits<uint16_t>::max()) {
return absl::InvalidArgumentError(absl::StrCat(
"Invalid port number: ", proxy_config.target().port()));
}
absl::StrAppend(&target_host, ":", proxy_config.target().port());
}
}
Proxy::Resources resources;
if (proxy_config.has_ssh_options()) {
if (!proxy_config.has_target()) {
return absl::InvalidArgumentError(
"SSH options require ProxyConfiguration.target");
}
if (!options.ssh_client_factory) {
return absl::InvalidArgumentError("SshClientFactory not configured");
}
SshClient::CreationOptions ssh_options = {
.host = proxy_config.target().hostname(),
.port = kDefaultSshPort,
.max_connections = proxy_config.ssh_options().max_connections(),
.connect_timeout =
absl::Seconds(proxy_config.ssh_options().connect_timeout_sec()),
.connect_retry_interval = absl::Milliseconds(
proxy_config.ssh_options().connect_retry_interval_ms())};
if (proxy_config.ssh_options().has_credentials_file()) {
ssh_options.credentials_filename = {
proxy_config.ssh_options().credentials_file().path().begin(),
proxy_config.ssh_options().credentials_file().path().end()};
ssh_options.private_cache_path =
proxy_config.ssh_options().credentials_file().private_cache_path();
}
if (proxy_config.ssh_options().has_authz_server_settings()) {
ssh_options.grpc_auth =
proxy_config.ssh_options().authz_server_settings().grpc_auth();
int port =
proxy_config.ssh_options().authz_server_settings().oauth_port();
if (port < 0 || port > std::numeric_limits<uint16_t>::max()) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid oauth port number: ", port));
}
if (port != 0) {
ssh_options.oauth_port = static_cast<uint16_t>(port);
}
}
if (proxy_config.ssh_options().port() >
std::numeric_limits<uint16_t>::max()) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid port number: ", proxy_config.target().port()));
} else if (proxy_config.ssh_options().port() != 0) {
ssh_options.port = proxy_config.ssh_options().port();
}
absl::StatusOr<std::unique_ptr<SshClient>> ssh_client =
options.ssh_client_factory(std::move(ssh_options));
if (!ssh_client.ok()) {
absl::Status status = std::move(ssh_client).status();
status;
return status;
}
resources.ssh_client = *std::move(ssh_client);
}
std::unique_ptr<PermissionChecker> permission_checker =
options.permission_checker_factory ? options.permission_checker_factory()
: nullptr;
size_t index = proxies_.size();
proxies_.emplace_back(target_host, absl::MakeSpan(plugins), queue_.get(),
std::move(resources),
proxy_config.resource_authorization_policy(),
std::move(permission_checker));
Proxy& proxy = proxies_.back();
if (proxy_config.has_redfish_v1_options()) {
absl::Status status = proxy.AddService(proxy_config.redfish_v1_options());
if (!status.ok()) {
return status;
}
}
if (proxy_config.has_voyager_telemetry_options()) {
absl::Status status =
proxy.AddService(proxy_config.voyager_telemetry_options());
if (!status.ok()) {
return status;
}
}
for (const auto& service : proxy_config.additional_services()) {
absl::Status status = proxy.AddService(service);
if (!status.ok()) {
return status;
}
}
Proxy::GrpcCredentials credentials;
if (options.credentials.size() > index) {
credentials = options.credentials[index];
}
std::shared_ptr<grpc::experimental::AuthorizationPolicyProviderInterface>
provider = nullptr;
if (options.authorization_providers.size() > index) {
provider = options.authorization_providers[index];
}
return proxy.ConfigGrpcAndStart(proxy_config.grpc_configuration(),
options.is_safe_uds_root, credentials,
provider);
}
ProxyBuilder::~ProxyBuilder() {
if (queue_ != nullptr) {
queue_->Shutdown(/*run_enqueued_jobs=*/false);
}
}
absl::Status ProxyBuilder::Wait() {
while (true) {
absl::Status status = queue_->ProcessQueue(quantum_);
if (!status.ok()) {
// ProcessQueue can be cancelled by shutting down the queue from another
// thread, which is expected behavior, and doesn't need to be handled
// specially here.
if (absl::IsCancelled(status)) {
return absl::OkStatus();
}
return status;
}
}
}
void ProxyBuilder::Shutdown() { queue_->Shutdown(); }
} // namespace milotic