blob: c79f3a1e9f6298a7c775f8b8c21ac0de309b24d0 [file] [log] [blame]
#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include "tlbmc/collection_entity.pb.h"
#include "tlbmc/feed_client_interface.h"
#include "tlbmc/service_messages.pb.h"
#include "one/offline_node_entities.pb.h"
#include "one/resolved_entities.pb.h"
#include "one/public_offline_node_entities.h"
#include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/btree_set.h"
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
#include "nlohmann/json_fwd.hpp"
#include "tlbmc/adapter/data_source.h"
#include "tlbmc/subscription/manager.h"
#include "identifier.pb.h"
#include "payload.pb.h"
#include "sensor_identifier.pb.h"
#include "sensor_payload.pb.h"
#include "subscription_params.pb.h"
#include "tlbmc/scheduler/scheduler.h"
namespace milotic_hft {
struct IdentifierHash {
size_t operator()(const milotic_hft::Identifier& identifier) const {
return absl::HashOf(absl::StrCat(identifier));
}
};
struct IdentifierEqual {
bool operator()(const milotic_hft::Identifier& a,
const milotic_hft::Identifier& b) const {
return absl::StrCat(a) == absl::StrCat(b);
}
};
// Forward declaration of ActiveSubscription.
class ActiveSubscription;
// Manages the sampling interval of a resource.
// It tracks the sampling interval of a resource and the subscribers
// of the resource. When a new subscriber is added, the effective sampling
// interval of the resource is updated to be the minimum of the sampling
// intervals of all subscribers.
// When a subscriber is removed, the effective sampling interval is updated to
// be the minimum of the sampling intervals of the remaining subscribers.
class ResourceMonitor {
public:
using SubscriptionId = SubscriptionManager::SubscriptionId;
explicit ResourceMonitor(const milotic_hft::Identifier& identifier,
DataSource* data_source)
: identifier_(identifier),
current_sampling_interval_ms_(0),
data_source_(data_source) {}
const milotic_hft::Identifier& GetIdentifier() const { return identifier_; }
absl::Status AddSubscriber(
const std::shared_ptr<ActiveSubscription>& subscription)
ABSL_LOCKS_EXCLUDED(subscribers_mutex_);
absl::Status RemoveSubscriber(const SubscriptionId& id)
ABSL_LOCKS_EXCLUDED(subscribers_mutex_);
absl::Status ConfigureDataSource() ABSL_LOCKS_EXCLUDED(subscribers_mutex_);
int GetCurrentSamplingIntervalMs() const {
return current_sampling_interval_ms_;
}
nlohmann::json ToJson() const;
protected:
const milotic_hft::Identifier identifier_;
std::atomic<int> current_sampling_interval_ms_;
std::atomic<int> current_max_batch_size_;
mutable absl::Mutex subscribers_mutex_;
absl::flat_hash_map<SubscriptionManager::SubscriptionId,
std::shared_ptr<ActiveSubscription>>
subscribers_ ABSL_GUARDED_BY(subscribers_mutex_);
absl::btree_multiset<int> lowest_sampling_interval_subscriptions_
ABSL_GUARDED_BY(subscribers_mutex_);
DataSource* data_source_;
absl::btree_multiset<int, std::greater<>> max_batch_sizes_
ABSL_GUARDED_BY(subscribers_mutex_);
};
class ActiveSubscription
: public std::enable_shared_from_this<ActiveSubscription> {
public:
using SubscriptionId = SubscriptionManager::SubscriptionId;
ActiveSubscription(
const SubscriptionId& id, const milotic_hft::SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback,
absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
on_subscription_ended,
milotic_tlbmc::TaskScheduler* task_scheduler, DataSource* data_source);
~ActiveSubscription() {
if (task_scheduler_ != nullptr) {
task_scheduler_->Cancel(task_id_);
}
LOG(WARNING) << "ActiveSubscription " << id_ << " destroyed.";
}
void Begin();
SubscriptionId GetId() const { return id_; }
const milotic_hft::SubscriptionParams& GetParams() const { return params_; }
void DeliverData(Payload&& data);
bool IsActive() const { return batches_remaining_ != 0; }
void OnSubscriptionEnd(const SubscriptionId& id, const absl::Status& status) {
on_subscription_ended_callback_(id, status);
}
nlohmann::json ToJson() const;
private:
std::atomic<int> task_id_ = -1; // -1 means no task is scheduled.
const SubscriptionId id_;
const milotic_hft::SubscriptionParams params_;
absl::AnyInvocable<void(Payload&&)> on_data_callback_;
absl::AnyInvocable<void(const SubscriptionId&, const absl::Status&)>
on_subscription_ended_callback_;
std::atomic<int> batches_remaining_ = -1; // -1 means unlimited.
absl::flat_hash_map<milotic_hft::Identifier, absl::Time, IdentifierHash,
IdentifierEqual>
identifier_to_last_sampled_time_
ABSL_GUARDED_BY(identifier_to_last_sampled_time_ms_mutex_);
mutable absl::Mutex identifier_to_last_sampled_time_ms_mutex_;
milotic_tlbmc::TaskScheduler* task_scheduler_;
DataSource* data_source_;
};
class SubscriptionManagerImpl : public SubscriptionManager {
public:
using SubscriptionId = SubscriptionManager::SubscriptionId;
struct Options {
absl::Duration get_policies_interval = absl::Minutes(15);
platforms_syshealth::collection::feed::EndpointIdentifier::EndpointType
endpoint_type ABSL_REQUIRE_EXPLICIT_INIT;
std::string offline_node_entities_path;
};
~SubscriptionManagerImpl() override;
// Creates a subscription manager.
static std::unique_ptr<SubscriptionManagerImpl> Create(
std::unique_ptr<DataSource> data_source) {
return absl::WrapUnique(
new SubscriptionManagerImpl(std::move(data_source)));
}
// Creates a subscription manager with feed client.
static std::unique_ptr<SubscriptionManagerImpl> Create(
std::unique_ptr<DataSource> data_source,
platforms_syshealth::collection::feed::FeedClientInterface* feed_client,
const Options& options) {
platforms_syshealth::collection::feed::NodeEntityId node_entity_id;
absl::StatusOr<
production_msv::node_entities_proto::OfflineNodeEntityInformation>
node_entity_info;
if (!options.offline_node_entities_path.empty()) {
node_entity_info =
production_msv::node_entities::ReadOfflineNodeEntityInformation(
options.offline_node_entities_path);
} else {
node_entity_info =
production_msv::node_entities::ReadOfflineNodeEntityInformation();
}
if (!node_entity_info.ok()) {
LOG(ERROR) << "Failed to read offline node entity information: "
<< node_entity_info.status();
return nullptr;
}
node_entity_id.set_machine_name(
node_entity_info->resolved_config().machine_name());
node_entity_id.set_entity_tag(node_entity_info->entity_tag());
if (node_entity_info->resolved_config().entities().contains(
node_entity_info->entity_tag())) {
node_entity_id.set_hostname(node_entity_info->resolved_config()
.entities()
.at(node_entity_info->entity_tag())
.hostname());
}
return absl::WrapUnique(new SubscriptionManagerImpl(
std::move(data_source), feed_client, options, node_entity_id));
}
absl::StatusOr<SubscriptionId> AddSubscription(
const SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback) override
ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_);
absl::Status Unsubscribe(const SubscriptionId& sub_id) override
ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_);
nlohmann::json ToJson() const override;
private:
explicit SubscriptionManagerImpl(std::unique_ptr<DataSource> data_source);
SubscriptionManagerImpl(
std::unique_ptr<DataSource> data_source,
platforms_syshealth::collection::feed::FeedClientInterface* feed_client,
const Options& options,
const platforms_syshealth::collection::feed::NodeEntityId&
node_entity_id);
void PullAgentPoliciesAndUpdateSubscriptions();
SubscriptionId GenerateSubscriptionId();
void OnSubscriptionEnd(const SubscriptionId& subscription_id,
const absl::Status& status)
ABSL_LOCKS_EXCLUDED(active_subscriptions_mutex_,
resource_monitors_mutex_);
std::atomic<size_t> next_subscription_id_;
std::unique_ptr<milotic_tlbmc::TaskScheduler> task_scheduler_;
mutable absl::Mutex active_subscriptions_mutex_;
absl::flat_hash_map<SubscriptionId, std::shared_ptr<ActiveSubscription>>
active_subscriptions_ ABSL_GUARDED_BY(active_subscriptions_mutex_);
std::unique_ptr<DataSource> data_source_;
mutable absl::Mutex resource_monitors_mutex_;
absl::flat_hash_map<milotic_hft::Identifier, std::shared_ptr<ResourceMonitor>,
IdentifierHash, IdentifierEqual>
resource_monitors_ ABSL_GUARDED_BY(resource_monitors_mutex_);
platforms_syshealth::collection::feed::FeedClientInterface* feed_client_ =
nullptr;
absl::Duration get_policies_interval_;
std::atomic<int> get_policies_task_id_ = -1;
absl::flat_hash_map<std::string, SubscriptionId> policy_to_subscription_ids_
ABSL_GUARDED_BY(active_subscriptions_mutex_);
absl::flat_hash_map<SubscriptionId, std::string> subscription_to_policy_ids_
ABSL_GUARDED_BY(active_subscriptions_mutex_);
const platforms_syshealth::collection::feed::EndpointIdentifier::EndpointType
endpoint_type_ = platforms_syshealth::collection::feed::
EndpointIdentifier::ENDPOINT_TYPE_UNSPECIFIED;
platforms_syshealth::collection::feed::NodeEntityId node_entity_id_;
};
} // namespace milotic_hft
#endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_IMPL_H_