Adding implementation for subscription manager This CL introduces the implementation of subscription manager and related classes. It includes the following features: - ResourceMonitor to monitor the sampling intervals for each resource. - ActiveSubscription to manage a single subscription. - SubscriptionManagerImpl which provide APIs to add/remove subscriptions. - Resample function which downsample data collected by the data source. - Unit tests to validate the implementation. In addition to above changes, added `SubscriptionPolicy` to SubscriptionParams to allow the subscriber to specify Resource to subscribe and whether subscription should apply to all resources or specific based on the given identifiers Tested: Unit Test #tlbmc #tlbmc-hft PiperOrigin-RevId: 760813662 Change-Id: I78f76b4ea6dcd6cfb07ff3489cbee4be38c7d53d
diff --git a/tlbmc/sensor_payload.proto b/tlbmc/sensor_payload.proto index c1aace3..00a84f1 100644 --- a/tlbmc/sensor_payload.proto +++ b/tlbmc/sensor_payload.proto
@@ -5,7 +5,10 @@ import "sensor_identifier.proto"; message HighFrequencySensorReading { - float float_reading = 1; + oneof reading { + float float_reading = 1; + // Add support for other types of readings. + } int64 timestamp_ns = 2; // Nanosecond timestamp of the readings. } @@ -15,5 +18,5 @@ } message HighFrequencySensorsReadingsBatch { - repeated HighFrequencySensorsReadings readings = 1; + repeated HighFrequencySensorsReadings high_frequency_sensors = 1; }
diff --git a/tlbmc/subscription/manager.h b/tlbmc/subscription/manager.h index 3d30b86..092d4c2 100644 --- a/tlbmc/subscription/manager.h +++ b/tlbmc/subscription/manager.h
@@ -1,43 +1,29 @@ #ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_H_ #define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_H_ -#include <memory> +#include <string> #include "absl/functional/any_invocable.h" -#include "tlbmc/adapter/data_source.h" -#include "tlbmc/subscription/store.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" #include "tlbmc/types/payload.proto.h" #include "tlbmc/types/subscription_params.proto.h" -#include "tlbmc/scheduler/scheduler.h" namespace milotic_hft { class SubscriptionManager { public: - // A handle to a subscription that can be used to cancel the subscription. - class SubscriptionHandle { - public: - ~SubscriptionHandle() { subscription_->Cancel(); } - - private: - Subscription* subscription_; - }; + using SubscriptionId = std::string; virtual ~SubscriptionManager() = default; // Adds a new subscription to the manager. // `on_data_callback` will be called when new data is available to be exported // to the subscriber. - virtual void AddSubscription( + virtual absl::StatusOr<SubscriptionId> AddSubscription( const SubscriptionParams& params, - absl::AnyInvocable<void(Payload&&)> - on_data_callback) = 0; - - private: - // An adapter to the data source. - std::unique_ptr<DataSource> data_source_; - std::unique_ptr<SubscriptionStore> subscription_store_; - std::unique_ptr<milotic_tlbmc::TaskScheduler> task_scheduler_; + absl::AnyInvocable<void(Payload&&)> on_data_callback) = 0; + virtual absl::Status Unsubscribe(const SubscriptionId& sub_id) = 0; }; } // namespace milotic_hft
diff --git a/tlbmc/subscription_params.proto b/tlbmc/subscription_params.proto index 5fdaa1c..84b3a98 100644 --- a/tlbmc/subscription_params.proto +++ b/tlbmc/subscription_params.proto
@@ -4,6 +4,22 @@ import "identifier.proto"; +message SubscriptionPolicy { + enum ConfigurationType { + CONFIGURATION_TYPE_UNSPECIFIED = 0; + CONFIGURATION_TYPE_CONFIGURE_ALL_RESOURCES = 1; + CONFIGURATION_TYPE_CONFIGURE_SPECIFIC_RESOURCES = 2; + } + + enum ResourceType { + RESOURCE_TYPE_UNSPECIFIED = 0; + RESOURCE_TYPE_SENSOR = 1; + RESOURCE_TYPE_FRU = 2; + } + ConfigurationType configuration_type = 1; + ResourceType resource_type = 2; +} + message SubscriptionParams { // The interval between two consecutive batches of readings. // If not set, the default sampling interval is implementation defined. @@ -23,10 +39,10 @@ int32 export_interval_ms = 2; // The number of batches to export before cancelling the subscription. - // If set to 0, the subscription will remain active indefinitely. - // If not set, the subscription will be cancelled after the first batch. - int32 num_batches = 3 [default = 1]; + // If not set, the subscription will continue until explicitly cancelled. + int32 num_batches = 3; - // The identifier of the data source to subscribe to. - Identifier identifier = 4; + // The identifiers of the resources to subscribe to. + repeated Identifier identifiers = 4; + SubscriptionPolicy subscription_policy = 5; }