| #include "tlbmc/collector/metric_collector.h" |
| |
| #include <functional> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/log.h" |
| #include "absl/memory/memory.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/time/time.h" |
| #include "boost/asio.hpp" //NOLINT: boost::asio is commonly used in BMC |
| #include "g3/macros.h" |
| #include "thread/thread.h" |
| #include "nlohmann/json.hpp" |
| #include "software_metrics_config.pb.h" |
| #include "tlbmc/metrics/software_metrics.h" |
| #include "resource.pb.h" |
| #include "software_metrics.pb.h" |
| #include "tlbmc/utils/command_executor.h" |
| #include "google/protobuf/json/json.h" |
| #include "google/protobuf/util/json_util.h" |
| #include "google/protobuf/util/message_differencer.h" |
| |
| constexpr absl::Duration kDefaultMetricSamplingInterval = absl::Seconds(30); |
| |
| namespace milotic_tlbmc { |
| |
| static absl::Status ScheduleMetricsRead( |
| const std::unique_ptr<SoftwareMetrics>& metrics, |
| const MetricCollector::Params& params, |
| MetricThreadManager& thread_manager) { |
| absl::Duration interval = kDefaultMetricSamplingInterval; |
| |
| LOG(INFO) << "Scheduling metric reads!" |
| << " Interval is " << interval; |
| |
| thread_manager.task_id = thread_manager.task_scheduler->RunAndScheduleAsync( |
| [metrics = metrics.get()](absl::AnyInvocable<void()> on_done) { |
| if (metrics) { |
| metrics->RefreshOnce(std::move(on_done)); |
| } |
| }, |
| interval); |
| return absl::OkStatus(); |
| } |
| |
| static absl::StatusOr<std::unique_ptr<SoftwareMetrics>> CreateSoftwareMetrics( |
| const MetricCollector::Params& params, |
| MetricThreadManager& thread_manager) { |
| std::shared_ptr<boost::asio::io_context> io_context = |
| std::make_shared<boost::asio::io_context>(); |
| boost::asio::executor_work_guard<boost::asio::io_context::executor_type> |
| work_guard(boost::asio::make_work_guard(*io_context)); |
| ECCLESIA_ASSIGN_OR_RETURN(std::unique_ptr<SoftwareMetrics> metrics, |
| SoftwareMetrics::Create(params.metric_configs)); |
| thread_manager.threads.push_back( |
| params.thread_factory->New([io_context]() { io_context->run(); })); |
| thread_manager.work_guards.push_back(std::move(work_guard)); |
| thread_manager.io_contexts.push_back(std::move(io_context)); |
| |
| return metrics; |
| } |
| |
| static absl::StatusOr<std::unique_ptr<SoftwareMetrics>> |
| CreateSoftwareMetricsForUnitTest(MetricCollector::Params& params, |
| MetricThreadManager& thread_manager) { |
| std::shared_ptr<boost::asio::io_context> io_context = |
| std::make_shared<boost::asio::io_context>(); |
| boost::asio::executor_work_guard<boost::asio::io_context::executor_type> |
| work_guard(boost::asio::make_work_guard(*io_context)); |
| |
| ECCLESIA_ASSIGN_OR_RETURN( |
| std::unique_ptr<SoftwareMetrics> metrics, |
| SoftwareMetrics::CreateWithExecutorForUnitTest( |
| params.metric_configs, params.executor_command_map)); |
| |
| thread_manager.threads.push_back( |
| params.thread_factory->New([io_context]() { io_context->run(); })); |
| thread_manager.work_guards.push_back(std::move(work_guard)); |
| thread_manager.io_contexts.push_back(std::move(io_context)); |
| |
| return metrics; |
| } |
| |
| CommandExecutor* MetricCollector::GetSoftwareMetricsExecutorForTest() const { |
| return metrics_table_->GetExecutorForTest(); |
| } |
| |
| MetricThreadManager::~MetricThreadManager() { |
| // Stop the scheduler first. |
| task_scheduler->Stop(); |
| |
| // Stop io_contexts. |
| for (const std::shared_ptr<boost::asio::io_context>& io_context : |
| io_contexts) { |
| io_context->stop(); |
| } |
| |
| // Finally join all threads. |
| for (const std::unique_ptr<ecclesia::ThreadInterface>& thread : threads) { |
| thread->Join(); |
| } |
| } |
| nlohmann::json MetricCollector::ExtractSocketStats() const { |
| std::string json_string; |
| nlohmann::json::object_t response; |
| ::google::protobuf::util::JsonPrintOptions opts; |
| opts.preserve_proto_field_names = true; |
| |
| if (!::google::protobuf::json::MessageToJsonString( |
| metrics_table_->GetSocketStatStates(), &json_string, opts) |
| .ok()) { |
| LOG(ERROR) << "Failed to get socket stat data!"; |
| return response; |
| } |
| response["Value"] = nlohmann::json::parse(json_string, nullptr, false); |
| json_string.clear(); |
| return response; |
| } |
| |
| nlohmann::json MetricCollector::ExtractNetfilterStats() const { |
| std::string json_string; |
| nlohmann::json::object_t response; |
| ::google::protobuf::util::JsonPrintOptions opts; |
| opts.preserve_proto_field_names = true; |
| |
| if (!::google::protobuf::json::MessageToJsonString(metrics_table_->GetNetFilterStates(), |
| &json_string, opts) |
| .ok()) { |
| LOG(ERROR) << "Failed to get netfilter data!"; |
| return response; |
| } |
| response["Value"] = nlohmann::json::parse(json_string, nullptr, false); |
| json_string.clear(); |
| return response; |
| } |
| |
| nlohmann::json MetricCollector::ToJson() const { |
| nlohmann::json::object_t response; |
| response["SocketStats"] = ExtractSocketStats(); |
| response["NetfilterStats"] = ExtractNetfilterStats(); |
| return response; |
| } |
| |
| SoftwareMetricsValue MetricCollector::GetMetricValues() const { |
| return metrics_table_->GetSoftwareMetricsValues(); |
| } |
| |
| SocketStatStates MetricCollector::GetSocketStatMetricsValues() const { |
| return metrics_table_->GetSocketStatStates(); |
| } |
| |
| NetFilterStates MetricCollector::GetNetFilterValues() const { |
| return metrics_table_->GetNetFilterStates(); |
| } |
| |
| absl::StatusOr<std::unique_ptr<MetricCollector>> MetricCollector::Create( |
| const MetricCollector::Params& params) { |
| auto thread_manager = std::make_unique<MetricThreadManager>(params.clock); |
| |
| if (::google::protobuf::util::MessageDifferencer::Equals( |
| params.metric_configs, SoftwareMetricsConfig::default_instance())) { |
| LOG(WARNING) << "WARNING: software metrics configs are empty."; |
| } |
| ECCLESIA_ASSIGN_OR_RETURN(std::unique_ptr<SoftwareMetrics> metrics, |
| CreateSoftwareMetrics(params, *thread_manager)); |
| |
| ECCLESIA_RETURN_IF_ERROR( |
| ScheduleMetricsRead(metrics, params, *thread_manager)); |
| |
| return absl::WrapUnique( |
| new MetricCollector(std::move(metrics), std::move(thread_manager))); |
| } |
| |
| absl::StatusOr<std::unique_ptr<MetricCollector>> |
| MetricCollector::CreateForUnitTest(MetricCollector::Params& params) { |
| auto thread_manager = std::make_unique<MetricThreadManager>(params.clock); |
| |
| ECCLESIA_ASSIGN_OR_RETURN( |
| std::unique_ptr<SoftwareMetrics> metrics, |
| CreateSoftwareMetricsForUnitTest(params, *thread_manager)); |
| |
| ECCLESIA_RETURN_IF_ERROR( |
| ScheduleMetricsRead(metrics, params, *thread_manager)); |
| |
| return absl::WrapUnique( |
| new MetricCollector(std::move(metrics), std::move(thread_manager))); |
| } |
| std::unique_ptr<EmptyMetricCollector> EmptyMetricCollector::Create() { |
| return std::make_unique<EmptyMetricCollector>(); |
| } |
| |
| SoftwareMetricsValue EmptyMetricCollector::GetMetricValues() const { |
| LOG(WARNING) << "EmptyMetricCollector::GetMetricValues is " |
| "called. This will return an empty list."; |
| return SoftwareMetricsValue(); |
| } |
| |
| SocketStatStates EmptyMetricCollector::GetSocketStatMetricsValues() const { |
| LOG(WARNING) << "EmptyMetricCollector::GetSocketStatMetricsValues is " |
| "called. This will return an empty list."; |
| return SocketStatStates(); |
| } |
| |
| NetFilterStates EmptyMetricCollector::GetNetFilterValues() const { |
| LOG(WARNING) << "EmptyMetricCollector::GetNfValues is " |
| "called. This will return an empty list."; |
| return NetFilterStates(); |
| } |
| |
| nlohmann::json EmptyMetricCollector::GetSchedulerStats() const { |
| return nlohmann::json::parse("{\"Warning\": \"EmptyMetricCollector used.\"}"); |
| } |
| |
| nlohmann::json EmptyMetricCollector::ToJson() const { |
| return nlohmann::json::parse("{\"Warning\": \"EmptyMetricCollector used.\"}"); |
| } |
| } // namespace milotic_tlbmc |