Adding implementation for subscription manager
This CL introduces the implementation of subscription manager and related classes.
It includes the following features:
- ResourceMonitor to monitor the sampling intervals for each resource.
- ActiveSubscription to manage a single subscription.
- SubscriptionManagerImpl which provide APIs to add/remove subscriptions.
- Resample function which downsample data collected by the data source.
- Unit tests to validate the implementation.
In addition to above changes, added `SubscriptionPolicy` to SubscriptionParams to allow the subscriber to specify Resource to subscribe and whether subscription should apply to all resources or specific based on the given identifiers
Tested:
Unit Test
#tlbmc
#tlbmc-hft
PiperOrigin-RevId: 760813662
Change-Id: I78f76b4ea6dcd6cfb07ff3489cbee4be38c7d53d
diff --git a/tlbmc/sensor_payload.proto b/tlbmc/sensor_payload.proto
index c1aace3..00a84f1 100644
--- a/tlbmc/sensor_payload.proto
+++ b/tlbmc/sensor_payload.proto
@@ -5,7 +5,10 @@
import "sensor_identifier.proto";
message HighFrequencySensorReading {
- float float_reading = 1;
+ oneof reading {
+ float float_reading = 1;
+ // Add support for other types of readings.
+ }
int64 timestamp_ns = 2; // Nanosecond timestamp of the readings.
}
@@ -15,5 +18,5 @@
}
message HighFrequencySensorsReadingsBatch {
- repeated HighFrequencySensorsReadings readings = 1;
+ repeated HighFrequencySensorsReadings high_frequency_sensors = 1;
}
diff --git a/tlbmc/subscription/manager.h b/tlbmc/subscription/manager.h
index 3d30b86..092d4c2 100644
--- a/tlbmc/subscription/manager.h
+++ b/tlbmc/subscription/manager.h
@@ -1,43 +1,29 @@
#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_H_
-#include <memory>
+#include <string>
#include "absl/functional/any_invocable.h"
-#include "tlbmc/adapter/data_source.h"
-#include "tlbmc/subscription/store.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
#include "tlbmc/types/payload.proto.h"
#include "tlbmc/types/subscription_params.proto.h"
-#include "tlbmc/scheduler/scheduler.h"
namespace milotic_hft {
class SubscriptionManager {
public:
- // A handle to a subscription that can be used to cancel the subscription.
- class SubscriptionHandle {
- public:
- ~SubscriptionHandle() { subscription_->Cancel(); }
-
- private:
- Subscription* subscription_;
- };
+ using SubscriptionId = std::string;
virtual ~SubscriptionManager() = default;
// Adds a new subscription to the manager.
// `on_data_callback` will be called when new data is available to be exported
// to the subscriber.
- virtual void AddSubscription(
+ virtual absl::StatusOr<SubscriptionId> AddSubscription(
const SubscriptionParams& params,
- absl::AnyInvocable<void(Payload&&)>
- on_data_callback) = 0;
-
- private:
- // An adapter to the data source.
- std::unique_ptr<DataSource> data_source_;
- std::unique_ptr<SubscriptionStore> subscription_store_;
- std::unique_ptr<milotic_tlbmc::TaskScheduler> task_scheduler_;
+ absl::AnyInvocable<void(Payload&&)> on_data_callback) = 0;
+ virtual absl::Status Unsubscribe(const SubscriptionId& sub_id) = 0;
};
} // namespace milotic_hft
diff --git a/tlbmc/subscription_params.proto b/tlbmc/subscription_params.proto
index 5fdaa1c..84b3a98 100644
--- a/tlbmc/subscription_params.proto
+++ b/tlbmc/subscription_params.proto
@@ -4,6 +4,22 @@
import "identifier.proto";
+message SubscriptionPolicy {
+ enum ConfigurationType {
+ CONFIGURATION_TYPE_UNSPECIFIED = 0;
+ CONFIGURATION_TYPE_CONFIGURE_ALL_RESOURCES = 1;
+ CONFIGURATION_TYPE_CONFIGURE_SPECIFIC_RESOURCES = 2;
+ }
+
+ enum ResourceType {
+ RESOURCE_TYPE_UNSPECIFIED = 0;
+ RESOURCE_TYPE_SENSOR = 1;
+ RESOURCE_TYPE_FRU = 2;
+ }
+ ConfigurationType configuration_type = 1;
+ ResourceType resource_type = 2;
+}
+
message SubscriptionParams {
// The interval between two consecutive batches of readings.
// If not set, the default sampling interval is implementation defined.
@@ -23,10 +39,10 @@
int32 export_interval_ms = 2;
// The number of batches to export before cancelling the subscription.
- // If set to 0, the subscription will remain active indefinitely.
- // If not set, the subscription will be cancelled after the first batch.
- int32 num_batches = 3 [default = 1];
+ // If not set, the subscription will continue until explicitly cancelled.
+ int32 num_batches = 3;
- // The identifier of the data source to subscribe to.
- Identifier identifier = 4;
+ // The identifiers of the resources to subscribe to.
+ repeated Identifier identifiers = 4;
+ SubscriptionPolicy subscription_policy = 5;
}