blob: 2a893ba3e4189ffd210940dabb3ba71dfbdd3868 [file] [log] [blame] [edit]
#include "proxy_builder.h"
#include <fcntl.h>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "grpcpp/security/authorization_policy_provider.h"
#include "proxy.h"
#include "proxy_config.pb.h"
#include "redfish_plugin.h"
#include "request_response.h"
#include "resource_authz.h"
#include "ssh_client.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/text_format.h"
namespace milotic {
constexpr size_t kDefaultQueueSize = 4;
constexpr int kDefaultQuantum = 1;
constexpr uint16_t kDefaultSshPort = 22;
namespace internal {
absl::StatusOr<milotic_grpc_proxy::Configuration> LoadConfig(
const std::string& config_file_path) {
using google::protobuf::TextFormat;
using google::protobuf::io::FileInputStream;
int fd = open(config_file_path.c_str(), O_RDONLY);
if (fd < 0) {
return absl::NotFoundError(
absl::StrCat("Failed to open config file ", config_file_path));
}
FileInputStream config_file(fd);
// FileInputStream takes ownership of fd
config_file.SetCloseOnDelete(true);
milotic_grpc_proxy::Configuration config;
if (!TextFormat::Parse(&config_file, &config)) {
return absl::InvalidArgumentError(
absl::StrCat("Failed to parse config file ", config_file_path));
}
return config;
}
} // namespace internal
std::unique_ptr<RedfishPlugin> PluginFactory::CreatePlugin(
const milotic_grpc_proxy::Plugin& plugin_config) {
std::unique_ptr<RedfishPlugin> plugin;
for (const auto& fn : GetFunctionList()) {
plugin = fn(plugin_config);
if (plugin != nullptr) {
break;
}
}
return plugin;
}
absl::StatusOr<ProxyBuilder> ProxyBuilder::CreateAndStart(
const std::string& config_file_path, const Options& options) {
absl::StatusOr<milotic_grpc_proxy::Configuration> config =
internal::LoadConfig(config_file_path);
if (!config.ok()) {
return config.status();
}
return CreateAndStart(*config, options);
}
absl::StatusOr<ProxyBuilder> ProxyBuilder::CreateAndStart(
const milotic_grpc_proxy::Configuration& config, const Options& options) {
if (config.proxy_configuration().empty()) {
return absl::InvalidArgumentError("No proxy configs.");
}
size_t queue_size = kDefaultQueueSize;
if (config.has_queue_options() && config.queue_options().size() > 0) {
queue_size = config.queue_options().size();
}
int quantum = kDefaultQuantum;
if (config.has_queue_options() && config.queue_options().quantum() > 0) {
if (config.queue_options().quantum() > queue_size) {
return absl::InvalidArgumentError(absl::StrCat(
"Invalid quantum number: ", config.queue_options().quantum()));
}
quantum = static_cast<int>(config.queue_options().quantum());
}
ProxyBuilder proxy_builder(queue_size, quantum,
options.executor_factory(config.queue_options()));
for (const milotic_grpc_proxy::ProxyConfiguration& proxy_config :
config.proxy_configuration()) {
absl::Status status = proxy_builder.AddProxy(proxy_config, options);
if (!status.ok()) {
LOG(ERROR) << "Failed to add proxy with config: " << proxy_config
<< " with status: " << status;
return status;
}
}
return proxy_builder;
}
absl::Status ProxyBuilder::AddProxy(
const milotic_grpc_proxy::ProxyConfiguration& proxy_config,
const Options& options) {
std::vector<std::unique_ptr<RedfishPlugin>> plugins;
for (const auto& plugin_conf : proxy_config.plugins()) {
std::unique_ptr<RedfishPlugin> plugin =
PluginFactory::CreatePlugin(plugin_conf);
if (plugin == nullptr) {
return absl::InvalidArgumentError(
absl::StrCat("Unsupported plugin: ", plugin_conf.name()));
}
plugin->SetName(plugin_conf.name());
plugins.push_back(std::move(plugin));
}
std::vector<Host> hosts;
if (proxy_config.has_target()) {
if (proxy_config.target().port() > std::numeric_limits<uint16_t>::max()) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid port number: ", proxy_config.target().port()));
}
hosts.push_back(Host(proxy_config.target()));
}
if (!proxy_config.fallback_targets().empty()) {
for (const auto& fallback_target : proxy_config.fallback_targets()) {
if (fallback_target.port() > std::numeric_limits<uint16_t>::max()) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid port number: ", fallback_target.port()));
}
hosts.push_back(Host(fallback_target));
}
}
Proxy::Resources resources;
if (proxy_config.has_ssh_options()) {
if (!proxy_config.has_target()) {
return absl::InvalidArgumentError(
"SSH options require ProxyConfiguration.target");
}
if (!options.ssh_client_factory) {
return absl::InvalidArgumentError("SshClientFactory not configured");
}
SshClient::CreationOptions ssh_options = {
.max_connections = proxy_config.ssh_options().max_connections(),
.connect_timeout =
absl::Seconds(proxy_config.ssh_options().connect_timeout_sec()),
.connect_retry_interval = absl::Milliseconds(
proxy_config.ssh_options().connect_retry_interval_ms())};
if (proxy_config.ssh_options().has_credentials_file()) {
ssh_options.credentials_filename = {
proxy_config.ssh_options().credentials_file().path().begin(),
proxy_config.ssh_options().credentials_file().path().end()};
ssh_options.private_cache_path =
proxy_config.ssh_options().credentials_file().private_cache_path();
}
if (proxy_config.ssh_options().has_authz_server_settings()) {
ssh_options.grpc_auth =
proxy_config.ssh_options().authz_server_settings().grpc_auth();
int port =
proxy_config.ssh_options().authz_server_settings().oauth_port();
if (port < 0 || port > std::numeric_limits<uint16_t>::max()) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid oauth port number: ", port));
}
if (port != 0) {
ssh_options.oauth_port = static_cast<uint16_t>(port);
}
}
uint16_t ssh_port = kDefaultSshPort;
if (proxy_config.ssh_options().port() >
std::numeric_limits<uint16_t>::max()) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid port number: ", proxy_config.target().port()));
} else if (proxy_config.ssh_options().port() != 0) {
ssh_port = static_cast<uint16_t>(proxy_config.ssh_options().port());
}
std::vector<Host> ssh_hosts;
ssh_hosts.reserve(hosts.size());
for (const auto& host : hosts) {
ssh_hosts.push_back(Host(milotic_grpc_proxy::TARGET_SCHEME_SSH,
std::string(host.GetHostname()), ssh_port));
}
ssh_options.hosts = std::move(ssh_hosts);
if (proxy_config.ssh_options().has_connector_config()) {
ssh_options.connector_config =
proxy_config.ssh_options().connector_config();
}
absl::StatusOr<std::unique_ptr<SshClient>> ssh_client =
options.ssh_client_factory(std::move(ssh_options));
if (!ssh_client.ok()) {
absl::Status status = std::move(ssh_client).status();
status;
return status;
}
resources.ssh_client = *std::move(ssh_client);
}
std::unique_ptr<PermissionChecker> permission_checker =
options.permission_checker_factory ? options.permission_checker_factory()
: nullptr;
size_t index = proxies_.size();
proxies_.emplace_back(std::move(hosts), absl::MakeSpan(plugins), queue_.get(),
std::move(resources),
proxy_config.resource_authorization_policy(),
std::move(permission_checker), proxy_config.name());
Proxy& proxy = proxies_.back();
if (proxy_config.has_redfish_v1_options()) {
absl::Status status = proxy.AddService(proxy_config.redfish_v1_options());
if (!status.ok()) {
return status;
}
}
if (proxy_config.has_voyager_telemetry_options()) {
absl::Status status =
proxy.AddService(proxy_config.voyager_telemetry_options());
if (!status.ok()) {
return status;
}
}
for (const auto& service : proxy_config.additional_services()) {
absl::Status status = proxy.AddService(service);
if (!status.ok()) {
return status;
}
}
Proxy::GrpcCredentials credentials;
if (options.credentials.size() > index) {
credentials = options.credentials[index];
}
std::shared_ptr<grpc::experimental::AuthorizationPolicyProviderInterface>
provider = nullptr;
if (options.authorization_providers.size() > index) {
provider = options.authorization_providers[index];
}
return proxy.ConfigGrpcAndStart(proxy_config.grpc_configuration(),
options.is_safe_uds_root, credentials,
provider);
}
ProxyBuilder::~ProxyBuilder() {
if (queue_ != nullptr) {
queue_->Shutdown(/*run_enqueued_jobs=*/false);
}
}
absl::Status ProxyBuilder::Wait() {
while (true) {
absl::Status status = queue_->ProcessQueue(quantum_);
if (!status.ok()) {
// ProcessQueue can be cancelled by shutting down the queue from another
// thread, which is expected behavior, and doesn't need to be handled
// specially here.
if (absl::IsCancelled(status)) {
return absl::OkStatus();
}
return status;
}
}
}
void ProxyBuilder::Shutdown() { queue_->Shutdown(); }
} // namespace milotic