blob: 45a996b4db2b04fbca38a8a50f39786a420bc8ad [file] [log] [blame]
#include "tlbmc/metrics/software_metrics.h"
#include <cstdint>
#include <memory>
#include <sstream>
#include <string>
#include "google/protobuf/duration.pb.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/synchronization/mutex.h"
#include "g3/macros.h"
#include "nlohmann/json.hpp"
#include "resource.pb.h"
#include "software_metrics.pb.h"
#include "tlbmc/time/time.h"
#include "tlbmc/utils/command_executor.h"
namespace milotic_tlbmc {
SoftwareMetricsAttributesStatic SoftwareMetrics::CreateStaticAttributes(
const SoftwareMetricsConfig& software_metrics_config) {
SoftwareMetricsAttributesStatic metric_attributes_static;
*metric_attributes_static.mutable_software_metrics_config() =
software_metrics_config;
return metric_attributes_static;
}
absl::StatusOr<SocketStatStates> SoftwareMetrics::UpdateSocketStats() {
ECCLESIA_ASSIGN_OR_RETURN(std::string cmd_output,
executor_->Execute(*kSocketStatCmd));
SocketStatStates socket_states;
*socket_states.mutable_timestamp() = Now();
std::stringstream ss(cmd_output);
std::string line;
while (std::getline(ss, line)) {
if (line.empty()) {
continue;
}
std::stringstream line_stream(line);
std::string state_str;
int port_num;
SocketStatState* state;
if (line_stream >> state_str >> port_num) {
if (ss_port_set_.contains(port_num)) {
state = &(*socket_states.mutable_states())[port_num];
if (state_str == "LISTEN") {
state->set_listen(state->listen() + 1);
} else if (state_str == "ESTAB") {
state->set_established(state->established() + 1);
} else if (state_str == "SYN-SENT") {
state->set_syn_sent(state->syn_sent() + 1);
} else if (state_str == "SYN-RECV") {
state->set_syn_received(state->syn_received() + 1);
} else if (state_str == "FIN-WAIT-1") {
state->set_fin_wait_one(state->fin_wait_one() + 1);
} else if (state_str == "FIN-WAIT-2") {
state->set_fin_wait_two(state->fin_wait_two() + 1);
} else if (state_str == "TIME-WAIT") {
state->set_time_wait(state->time_wait() + 1);
} else if (state_str == "CLOSE-WAIT") {
state->set_close_wait(state->close_wait() + 1);
} else if (state_str == "LAST-ACK") {
state->set_last_ack(state->last_ack() + 1);
} else if (state_str == "CLOSING") {
state->set_closing(state->closing() + 1);
}
}
}
}
if (socket_states.states_size() > 0) {
return socket_states;
}
return absl::NotFoundError(
"No matching NFT rules found for configured ports.");
}
absl::StatusOr<NetFilterStates> SoftwareMetrics::UpdateNetFilterStats() {
ECCLESIA_ASSIGN_OR_RETURN(absl::string_view cmd_output,
executor_->Execute(*kNetFilterCmd));
NetFilterStates nf_states;
*nf_states.mutable_timestamp() = Now();
// Parse JSON output
nlohmann::json nft_json = nlohmann::json::parse(cmd_output, nullptr, false);
if (nft_json.is_discarded()) {
return absl::NotFoundError("Failed to parse JSON output from nft.");
}
if (!nft_json.contains("nftables") || !nft_json["nftables"].is_array()) {
return absl::NotFoundError(
"nftables array not found in JSON output from nft.");
}
// Iterate through the rules and extract data for the ports we need.
for (const nlohmann::json& item : nft_json["nftables"]) {
if (!item.contains("rule") || !item["rule"].is_object()) {
continue;
}
const nlohmann::json& rule = item["rule"];
if (!rule.contains("comment") || !rule["comment"].is_string()) {
continue;
}
// Ensure the comment refers to one of our counter rules
absl::string_view port_str = rule["comment"].get<absl::string_view>();
if (!absl::ConsumePrefix(&port_str, "tcp-server-") ||
!absl::ConsumeSuffix(&port_str, "-synack")) {
continue;
}
// Make sure we have an actual port and we are configured to collect this.
int32_t port_id;
if (!absl::SimpleAtoi(port_str, &port_id) ||
!nf_port_set_.contains(port_id)) {
continue;
}
// This rule is for one of our target ports. Now extract the packet count.
if (!rule.contains("expr") || !rule["expr"].is_array() ||
rule["expr"].size() != 1) {
LOG(WARNING) << "Rule for port " << port_id
<< " has unexpected expr structure.";
continue;
}
const nlohmann::json& counter_expr = rule["expr"][0];
if (!counter_expr.contains("counter") ||
!counter_expr["counter"].is_object() ||
!counter_expr["counter"].contains("packets") ||
!counter_expr["counter"]["packets"].is_number()) {
LOG(WARNING) << "Rule for port " << port_id
<< " is missing packet counter.";
continue;
}
NetFilterState* state = nf_states.add_states();
state->set_port(port_id);
state->set_num_connections(
counter_expr["counter"]["packets"].get<uint64_t>());
}
if (nf_states.states_size() > 0) {
return nf_states;
}
return absl::NotFoundError(
"No matching NFT rules found for configured ports.");
}
void SoftwareMetrics::StoreData(SoftwareMetricsValue* metric_data) {
// NOLINTNEXTLINE: Yocto's Abseil is old and only supports pointer type
absl::MutexLock lock(&metric_data_mutex_);
values_ = *metric_data;
}
absl::StatusOr<std::unique_ptr<SoftwareMetrics>>
SoftwareMetrics::CreateWithExecutorForUnitTest(
const SoftwareMetricsConfig& config,
absl::flat_hash_map<std::string, absl::StatusOr<std::string>>&
command_map) {
SoftwareMetricsAttributesStatic software_attributes_static =
CreateStaticAttributes(config);
std::unique_ptr<SoftwareMetrics> ptr(
new SoftwareMetrics(software_attributes_static));
return ptr;
}
absl::StatusOr<std::unique_ptr<SoftwareMetrics>> SoftwareMetrics::Create(
const SoftwareMetricsConfig& config) {
SoftwareMetricsAttributesStatic software_attributes_static =
CreateStaticAttributes(config);
std::unique_ptr<SoftwareMetrics> ptr(
new SoftwareMetrics(software_attributes_static));
return ptr;
}
void SoftwareMetrics::RefreshOnce(absl::AnyInvocable<void()> callback) {
SoftwareMetricsValue new_values;
{
// NOLINTNEXTLINE: Yocto's Abseil is old and only supports pointer type
absl::MutexLock lock(&metric_data_mutex_);
new_values = values_;
}
bool update_required = false;
if (!ss_port_set_.empty()) {
absl::StatusOr<SocketStatStates> socket_stats = UpdateSocketStats();
if (socket_stats.ok()) {
*new_values.mutable_socket_stat_state() = *socket_stats;
update_required = true;
} else {
LOG(ERROR) << "Failed to update socket stats: " << socket_stats.status();
}
}
if (!nf_port_set_.empty()) {
absl::StatusOr<NetFilterStates> netfilter_stats = UpdateNetFilterStats();
if (netfilter_stats.ok()) {
*new_values.mutable_netfilter_state() = *netfilter_stats;
update_required = true;
} else {
LOG(ERROR) << "Failed to update netfilter stats: "
<< netfilter_stats.status();
}
}
// We start off using the last successful set of values read for software
// metrics, if the update fails, we will not replace the stored values
// in memory. If any are successful we need to copy them back in.
if (update_required) {
StoreData(&new_values);
}
if (callback) {
callback();
}
}
} // namespace milotic_tlbmc