blob: d837d12a0e35ada7f500475246b6090bbd269ec6 [file] [log] [blame]
#include "tlbmc/collector/metric_collector.h"
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/time/time.h"
#include "boost/asio.hpp" //NOLINT: boost::asio is commonly used in BMC
#include "g3/macros.h"
#include "thread/thread.h"
#include "nlohmann/json.hpp"
#include "software_metrics_config.pb.h"
#include "tlbmc/metrics/software_metrics.h"
#include "resource.pb.h"
#include "software_metrics.pb.h"
#include "tlbmc/utils/command_executor.h"
#include "google/protobuf/json/json.h"
#include "google/protobuf/util/json_util.h"
#include "google/protobuf/util/message_differencer.h"
constexpr absl::Duration kDefaultMetricSamplingInterval = absl::Seconds(30);
namespace milotic_tlbmc {
static absl::Status ScheduleMetricsRead(
const std::unique_ptr<SoftwareMetrics>& metrics,
const MetricCollector::Params& params,
MetricThreadManager& thread_manager) {
absl::Duration interval = kDefaultMetricSamplingInterval;
LOG(INFO) << "Scheduling metric reads!"
<< " Interval is " << interval;
thread_manager.task_id = thread_manager.task_scheduler->RunAndScheduleAsync(
[metrics = metrics.get()](absl::AnyInvocable<void()> on_done) {
if (metrics) {
metrics->RefreshOnce(std::move(on_done));
}
},
interval);
return absl::OkStatus();
}
static absl::StatusOr<std::unique_ptr<SoftwareMetrics>> CreateSoftwareMetrics(
const MetricCollector::Params& params,
MetricThreadManager& thread_manager) {
std::shared_ptr<boost::asio::io_context> io_context =
std::make_shared<boost::asio::io_context>();
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
work_guard(boost::asio::make_work_guard(*io_context));
ECCLESIA_ASSIGN_OR_RETURN(std::unique_ptr<SoftwareMetrics> metrics,
SoftwareMetrics::Create(params.metric_configs));
thread_manager.threads.push_back(
params.thread_factory->New([io_context]() { io_context->run(); }));
thread_manager.work_guards.push_back(std::move(work_guard));
thread_manager.io_contexts.push_back(std::move(io_context));
return metrics;
}
static absl::StatusOr<std::unique_ptr<SoftwareMetrics>>
CreateSoftwareMetricsForUnitTest(MetricCollector::Params& params,
MetricThreadManager& thread_manager) {
std::shared_ptr<boost::asio::io_context> io_context =
std::make_shared<boost::asio::io_context>();
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
work_guard(boost::asio::make_work_guard(*io_context));
ECCLESIA_ASSIGN_OR_RETURN(
std::unique_ptr<SoftwareMetrics> metrics,
SoftwareMetrics::CreateWithExecutorForUnitTest(
params.metric_configs, params.executor_command_map));
thread_manager.threads.push_back(
params.thread_factory->New([io_context]() { io_context->run(); }));
thread_manager.work_guards.push_back(std::move(work_guard));
thread_manager.io_contexts.push_back(std::move(io_context));
return metrics;
}
CommandExecutor* MetricCollector::GetSoftwareMetricsExecutorForTest() const {
return metrics_table_->GetExecutorForTest();
}
MetricThreadManager::~MetricThreadManager() {
// Stop the scheduler first.
task_scheduler->Stop();
// Stop io_contexts.
for (const std::shared_ptr<boost::asio::io_context>& io_context :
io_contexts) {
io_context->stop();
}
// Finally join all threads.
for (const std::unique_ptr<ecclesia::ThreadInterface>& thread : threads) {
thread->Join();
}
}
nlohmann::json MetricCollector::ExtractSocketStats() const {
std::string json_string;
nlohmann::json::object_t response;
::google::protobuf::util::JsonPrintOptions opts;
opts.preserve_proto_field_names = true;
if (!::google::protobuf::json::MessageToJsonString(
metrics_table_->GetSocketStatStates(), &json_string, opts)
.ok()) {
LOG(ERROR) << "Failed to get socket stat data!";
return response;
}
response["Value"] = nlohmann::json::parse(json_string, nullptr, false);
json_string.clear();
return response;
}
nlohmann::json MetricCollector::ExtractNetfilterStats() const {
std::string json_string;
nlohmann::json::object_t response;
::google::protobuf::util::JsonPrintOptions opts;
opts.preserve_proto_field_names = true;
if (!::google::protobuf::json::MessageToJsonString(metrics_table_->GetNetFilterStates(),
&json_string, opts)
.ok()) {
LOG(ERROR) << "Failed to get netfilter data!";
return response;
}
response["Value"] = nlohmann::json::parse(json_string, nullptr, false);
json_string.clear();
return response;
}
nlohmann::json MetricCollector::ToJson() const {
nlohmann::json::object_t response;
response["SocketStats"] = ExtractSocketStats();
response["NetfilterStats"] = ExtractNetfilterStats();
return response;
}
SoftwareMetricsValue MetricCollector::GetMetricValues() const {
return metrics_table_->GetSoftwareMetricsValues();
}
SocketStatStates MetricCollector::GetSocketStatMetricsValues() const {
return metrics_table_->GetSocketStatStates();
}
NetFilterStates MetricCollector::GetNetFilterValues() const {
return metrics_table_->GetNetFilterStates();
}
absl::StatusOr<std::unique_ptr<MetricCollector>> MetricCollector::Create(
const MetricCollector::Params& params) {
auto thread_manager = std::make_unique<MetricThreadManager>(params.clock);
if (::google::protobuf::util::MessageDifferencer::Equals(
params.metric_configs, SoftwareMetricsConfig::default_instance())) {
LOG(WARNING) << "WARNING: software metrics configs are empty.";
}
ECCLESIA_ASSIGN_OR_RETURN(std::unique_ptr<SoftwareMetrics> metrics,
CreateSoftwareMetrics(params, *thread_manager));
ECCLESIA_RETURN_IF_ERROR(
ScheduleMetricsRead(metrics, params, *thread_manager));
return absl::WrapUnique(
new MetricCollector(std::move(metrics), std::move(thread_manager)));
}
absl::StatusOr<std::unique_ptr<MetricCollector>>
MetricCollector::CreateForUnitTest(MetricCollector::Params& params) {
auto thread_manager = std::make_unique<MetricThreadManager>(params.clock);
ECCLESIA_ASSIGN_OR_RETURN(
std::unique_ptr<SoftwareMetrics> metrics,
CreateSoftwareMetricsForUnitTest(params, *thread_manager));
ECCLESIA_RETURN_IF_ERROR(
ScheduleMetricsRead(metrics, params, *thread_manager));
return absl::WrapUnique(
new MetricCollector(std::move(metrics), std::move(thread_manager)));
}
std::unique_ptr<EmptyMetricCollector> EmptyMetricCollector::Create() {
return std::make_unique<EmptyMetricCollector>();
}
SoftwareMetricsValue EmptyMetricCollector::GetMetricValues() const {
LOG(WARNING) << "EmptyMetricCollector::GetMetricValues is "
"called. This will return an empty list.";
return SoftwareMetricsValue();
}
SocketStatStates EmptyMetricCollector::GetSocketStatMetricsValues() const {
LOG(WARNING) << "EmptyMetricCollector::GetSocketStatMetricsValues is "
"called. This will return an empty list.";
return SocketStatStates();
}
NetFilterStates EmptyMetricCollector::GetNetFilterValues() const {
LOG(WARNING) << "EmptyMetricCollector::GetNfValues is "
"called. This will return an empty list.";
return NetFilterStates();
}
nlohmann::json EmptyMetricCollector::GetSchedulerStats() const {
return nlohmann::json::parse("{\"Warning\": \"EmptyMetricCollector used.\"}");
}
nlohmann::json EmptyMetricCollector::ToJson() const {
return nlohmann::json::parse("{\"Warning\": \"EmptyMetricCollector used.\"}");
}
} // namespace milotic_tlbmc