|  | /* | 
|  | * Copyright 2023 Google LLC | 
|  | * | 
|  | * Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | * you may not use this file except in compliance with the License. | 
|  | * You may obtain a copy of the License at | 
|  | * | 
|  | *      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | #include "subscription_impl.h" | 
|  |  | 
|  | #include <cstddef> | 
|  | #include <ctime> | 
|  | #include <functional> | 
|  | #include <memory> | 
|  | #include <string> | 
|  | #include <unordered_set> | 
|  | #include <utility> | 
|  | #include <vector> | 
|  |  | 
|  | #include "absl/algorithm/container.h" | 
|  | #include "absl/base/thread_annotations.h" | 
|  | #include "absl/container/flat_hash_map.h" | 
|  | #include "absl/container/flat_hash_set.h" | 
|  | #include "absl/hash/hash.h" | 
|  | #include "absl/log/die_if_null.h" | 
|  | #include "absl/log/log.h" | 
|  | #include "absl/status/status.h" | 
|  | #include "absl/status/statusor.h" | 
|  | #include "absl/strings/str_cat.h" | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "absl/synchronization/mutex.h" | 
|  | #include "absl/time/clock.h" | 
|  | #include "absl/time/time.h" | 
|  | #include "absl/types/span.h" | 
|  | #include "subscription.h" | 
|  | // TODO(rahulkpr): Enable predicates after fixing gbmcweb copybara | 
|  | // #include | 
|  | // "redfish/redpath/definitions/query_predicates/predicates.h" | 
|  | #include "g3/macros.h" | 
|  | #include "nlohmann/json.hpp" | 
|  |  | 
|  | namespace ecclesia { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | // Redfish properties to parse in Subscription request. | 
|  | constexpr absl::string_view kPropertyOem = "Oem"; | 
|  | constexpr absl::string_view kPropertyGoogle = "Google"; | 
|  | constexpr absl::string_view kPropertyTriggers = "Triggers"; | 
|  | constexpr absl::string_view kPropertyLastEventId = "LastEventId"; | 
|  |  | 
|  | std::string PropertyNotPopulatedError(absl::string_view property) { | 
|  | return absl::StrCat(property, " not populated"); | 
|  | } | 
|  |  | 
|  | // Constructs a RedfishEvent from `query_response`. | 
|  | void BuildRedfishEvent(const Trigger &trigger, | 
|  | const nlohmann::json &query_response, | 
|  | nlohmann::json &redfish_event) { | 
|  | // Generate Event Id | 
|  | size_t event_id = absl::HashOf(absl::Now(), query_response.dump()); | 
|  | redfish_event["EventId"] = absl::StrCat(event_id); | 
|  |  | 
|  | // Get ISO extended string for timestamp | 
|  | redfish_event["EventTimestamp"] = | 
|  | absl::FormatTime(absl::RFC3339_full, absl::Now(), absl::UTCTimeZone()); | 
|  |  | 
|  | // Add origin of condition. | 
|  | redfish_event["OriginOfCondition"] = query_response; | 
|  |  | 
|  | // Add trigger id as context. | 
|  | redfish_event["Context"] = trigger.id; | 
|  | } | 
|  |  | 
|  | // Constructs RedfishEvents for redfish resources meeting trigger criteria. | 
|  | void BuildRedfishEventsForTrigger( | 
|  | const Trigger &trigger, | 
|  | const absl::flat_hash_map<std::string, nlohmann::json> &uri_to_response, | 
|  | std::vector<nlohmann::json> &events) { | 
|  | for (const auto &[uri, response] : uri_to_response) { | 
|  | // Skip event if URI is not part of the trigger origin resources. | 
|  | if (!trigger.origin_resources.contains(uri)) { | 
|  | continue; | 
|  | } | 
|  |  | 
|  | // TODO(rahulkpr): Enable predicates after fixing gbmcweb copybara | 
|  | // // Apply predicate in trigger to check if event meets the user defined | 
|  | // // criteria. | 
|  | // if (!trigger.predicate.empty()) { | 
|  | //   absl::StatusOr<bool> predicate_result = | 
|  | //       ApplyPredicateRule(response, {.predicate = trigger.predicate, | 
|  | //                                     .node_index = 0, | 
|  | //                                     .node_set_size = 1}); | 
|  | //   if (!predicate_result.ok()) { | 
|  | //     LOG(ERROR) << "Cannot apply trigger predicate rule for URI: " << uri | 
|  | //                << " error: " << predicate_result.status(); | 
|  | //     continue; | 
|  | //   } | 
|  |  | 
|  | //   if (!*predicate_result) { | 
|  | //     // Skip event if predicate is not satisfied. | 
|  | //     continue; | 
|  | //   } | 
|  | // } | 
|  |  | 
|  | // Construct RedfishEvent. | 
|  | BuildRedfishEvent(trigger, response, events.emplace_back()); | 
|  | } | 
|  | } | 
|  |  | 
|  | // Constructs RedfishEvents for a given subscription. | 
|  | std::vector<nlohmann::json> BuildRedfishEventsForSubscription( | 
|  | const SubscriptionContext &subscription_context, | 
|  | const absl::flat_hash_map<std::string, nlohmann::json> &uri_to_response) { | 
|  | std::vector<nlohmann::json> events; | 
|  | for (const auto &[id, trigger] : subscription_context.id_to_triggers) { | 
|  | BuildRedfishEventsForTrigger(trigger, uri_to_response, events); | 
|  | } | 
|  | return events; | 
|  | } | 
|  |  | 
|  | // Captures asynchronous responses received on subscribing each OriginResource. | 
|  | // Invokes `OnAsyncSubscribeComplete` when all OriginResources are subscribed. | 
|  | class AsyncSubscribeResponse { | 
|  | public: | 
|  | using OnAsyncSubscribeComplete = std::function<void( | 
|  | const absl::Status &, | 
|  | const absl::flat_hash_map<EventSourceId, absl::flat_hash_set<std::string>> | 
|  | &)>; | 
|  |  | 
|  | AsyncSubscribeResponse( | 
|  | const absl::flat_hash_set<std::string> &unique_resources_to_subscribe, | 
|  | OnAsyncSubscribeComplete &&on_async_subscribe_complete) | 
|  | : async_subscribe_complete_(std::move(on_async_subscribe_complete)), | 
|  | status_(absl::OkStatus()), | 
|  | responses_pending_(unique_resources_to_subscribe.begin(), | 
|  | unique_resources_to_subscribe.end()) {} | 
|  |  | 
|  | void ProcessResponse(absl::string_view uri, const absl::Status &status, | 
|  | const std::vector<EventSourceId> &event_source_ids) { | 
|  | absl::MutexLock lock(&mutex_); | 
|  |  | 
|  | // Check if we are expecting a response for the given URI. | 
|  | if (responses_pending_.erase(uri) == 0) { | 
|  | status_ = absl::InternalError( | 
|  | absl::StrCat("Received unexpected response for URI: ", uri)); | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (!status_.ok()) { | 
|  | // Exit without processing response if even one subscription request has | 
|  | // failed. We don't want to have partial subscriptions in the system. | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (status_.ok() && !status.ok()) { | 
|  | LOG(ERROR) << "Subscribe request to URI failed. URI: " << uri | 
|  | << " status: " << status; | 
|  | status_ = status; | 
|  | } | 
|  |  | 
|  | for (const EventSourceId &event_source_id : event_source_ids) { | 
|  | event_source_id_to_origin_resources_[event_source_id].insert( | 
|  | std::string(uri)); | 
|  | } | 
|  |  | 
|  | if (responses_pending_.empty() || !status_.ok()) { | 
|  | async_subscribe_complete_(status_, event_source_id_to_origin_resources_); | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | // Callback to invoke when all OriginResources are subscribed. | 
|  | const OnAsyncSubscribeComplete async_subscribe_complete_; | 
|  |  | 
|  | absl::Mutex mutex_; | 
|  |  | 
|  | // EventSourceIds returned from | 
|  | absl::flat_hash_map<EventSourceId, absl::flat_hash_set<std::string>> | 
|  | event_source_id_to_origin_resources_ ABSL_GUARDED_BY(mutex_); | 
|  | absl::Status status_ ABSL_GUARDED_BY(mutex_); | 
|  |  | 
|  | // Tracks async subscribe responses received from Subscription Backend. | 
|  | // Response can be valid or invalid. | 
|  | absl::flat_hash_set<std::string> responses_pending_ ABSL_GUARDED_BY(mutex_); | 
|  | }; | 
|  |  | 
|  | // Aggregates responses received from querying multiple URIs asynchronously. | 
|  | // Invokes `done_callback` when all OriginResources are queried. | 
|  | class AggregatedQueryResponse { | 
|  | public: | 
|  | AggregatedQueryResponse( | 
|  | const absl::flat_hash_set<std::string> &unique_resources_to_query, | 
|  | std::function<void(absl::flat_hash_map<std::string, nlohmann::json>)> | 
|  | done_callback) | 
|  | : responses_pending_(unique_resources_to_query.begin(), | 
|  | unique_resources_to_query.end()), | 
|  | done_callback_(std::move(done_callback)) {} | 
|  |  | 
|  | void AddQueryResponse(absl::string_view uri, const absl::Status &status, | 
|  | const nlohmann::json &query_response) { | 
|  | absl::MutexLock lock(&query_mutex_); | 
|  | if (responses_pending_.erase(uri) == 0) { | 
|  | LOG(ERROR) << "Received unexpected response for URI: " << uri; | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (!status.ok()) { | 
|  | LOG(ERROR) << "Cannot create RedfishEvent for uri: " << uri | 
|  | << " error: " << status.message(); | 
|  | return; | 
|  | } | 
|  | uri_to_response_[uri] = query_response; | 
|  |  | 
|  | if (responses_pending_.empty()) { | 
|  | done_callback_(uri_to_response_); | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | absl::Mutex query_mutex_; | 
|  | absl::flat_hash_set<std::string> responses_pending_ | 
|  | ABSL_GUARDED_BY(query_mutex_); | 
|  | absl::flat_hash_map<std::string, nlohmann::json> uri_to_response_ | 
|  | ABSL_GUARDED_BY(query_mutex_); | 
|  | std::function<void(absl::flat_hash_map<std::string, nlohmann::json>)> | 
|  | done_callback_; | 
|  | }; | 
|  |  | 
|  | // Concrete implementation of SubscriptionService. | 
|  | // Thread safe | 
|  | class SubscriptionServiceImpl | 
|  | : public SubscriptionService, | 
|  | public std::enable_shared_from_this<SubscriptionServiceImpl> { | 
|  | public: | 
|  | SubscriptionServiceImpl( | 
|  | std::unique_ptr<SubscriptionBackend> subscription_backend, | 
|  | std::unique_ptr<SubscriptionStore> subscription_store, | 
|  | std::unique_ptr<EventStore> event_store) | 
|  | : subscription_backend_( | 
|  | ABSL_DIE_IF_NULL(std::move(subscription_backend))), | 
|  | subscription_store_(ABSL_DIE_IF_NULL(std::move(subscription_store))), | 
|  | event_store_(ABSL_DIE_IF_NULL(std::move(event_store))) {} | 
|  |  | 
|  | // Creates Event Subscription. | 
|  | // Invokes `on_subscribe_callback` with unique event subscription id after | 
|  | // successful subscription or with an error status if subscription | 
|  | // `request` is not formatted per the EventDestination schema. | 
|  | // The parameter `on_event_callback` is stored with the subscription to be | 
|  | // invoked to dispatch events to the subscriber. | 
|  | void CreateSubscription( | 
|  | const nlohmann::json &request, | 
|  | const std::unordered_set<std::string> &peer_privileges, | 
|  | std::function<void(const absl::StatusOr<SubscriptionId> &)> | 
|  | on_subscribe_callback, | 
|  | std::function<void(const nlohmann::json &)> on_event_callback) override { | 
|  | // Check if Oem, Google, and Triggers exist | 
|  | auto find_oem = request.find(kPropertyOem); | 
|  | if (find_oem == request.end()) { | 
|  | on_subscribe_callback( | 
|  | absl::InvalidArgumentError(PropertyNotPopulatedError(kPropertyOem))); | 
|  | return; | 
|  | } | 
|  |  | 
|  | auto find_google = find_oem->find(kPropertyGoogle); | 
|  | if (find_google == find_oem->end()) { | 
|  | on_subscribe_callback(absl::InvalidArgumentError( | 
|  | PropertyNotPopulatedError(kPropertyGoogle))); | 
|  | return; | 
|  | } | 
|  |  | 
|  | auto find_triggers = find_google->find(kPropertyTriggers); | 
|  | if (find_triggers == find_google->end() || find_triggers->empty()) { | 
|  | on_subscribe_callback(absl::InvalidArgumentError( | 
|  | PropertyNotPopulatedError(kPropertyTriggers))); | 
|  | return; | 
|  | } | 
|  |  | 
|  | // Maps trigger id to trigger object. | 
|  | absl::flat_hash_map<std::string, Trigger> trigger_id_to_trigger_obj; | 
|  |  | 
|  | // Store unique Redfish resources to subscribe. | 
|  | absl::flat_hash_set<std::string> unique_resources_to_subscribe; | 
|  |  | 
|  | // Parse Triggers | 
|  | for (const auto &trigger : *find_triggers) { | 
|  | if (trigger.is_null()) { | 
|  | on_subscribe_callback( | 
|  | absl::InvalidArgumentError("Trigger not populated")); | 
|  | return; | 
|  | } | 
|  |  | 
|  | absl::StatusOr<Trigger> trigger_obj = Trigger::Create(trigger); | 
|  | if (!trigger_obj.ok()) { | 
|  | on_subscribe_callback(trigger_obj.status()); | 
|  | return; | 
|  | } | 
|  |  | 
|  | // Update trigger id associations. | 
|  | trigger_id_to_trigger_obj.insert_or_assign(trigger_obj->id, *trigger_obj); | 
|  |  | 
|  | for (const std::string &origin_resource : trigger_obj->origin_resources) { | 
|  | // Skip subscribing if mask is set. | 
|  | if (trigger_obj->mask) continue; | 
|  | unique_resources_to_subscribe.insert(origin_resource); | 
|  | } | 
|  | } | 
|  |  | 
|  | auto on_subscribe = std::make_shared< | 
|  | std::function<void(const absl::StatusOr<SubscriptionId> &)>>( | 
|  | std::move(on_subscribe_callback)); | 
|  | auto on_async_subscribe_complete = | 
|  | [on_subscribe(on_subscribe), google_obj(*find_google), | 
|  | on_event_callback(std::move(on_event_callback)), | 
|  | trigger_id_to_trigger_obj, request, | 
|  | this, privileges = peer_privileges]( | 
|  | const absl::Status &status, | 
|  | const absl::flat_hash_map<EventSourceId, | 
|  | absl::flat_hash_set<std::string>> | 
|  | &event_source_to_origin_resources) mutable { | 
|  | if (!status.ok()) { | 
|  | (*on_subscribe)(status); | 
|  | // TODO(rahulkpr): Add logic to clean up partial subscription. | 
|  | return; | 
|  | } | 
|  |  | 
|  | // If subscriber requested to stream events queued after given | 
|  | // `last_event_id`, pull the events from the event store and dispatch. | 
|  | if (auto find_last_event_id = google_obj.find(kPropertyLastEventId); | 
|  | find_last_event_id != google_obj.end()) { | 
|  | // Dispatch events since last event id. | 
|  | absl::c_for_each( | 
|  | event_store_->GetEventsSince(find_last_event_id->get<size_t>()), | 
|  | on_event_callback); | 
|  | } | 
|  |  | 
|  | // Create subscription id. | 
|  | SubscriptionId subscription_id( | 
|  | absl::HashOf(absl::Now(), request.dump())); | 
|  |  | 
|  | // Build and store subscription context. | 
|  | auto subscription_context = std::make_unique<SubscriptionContext>( | 
|  | subscription_id, event_source_to_origin_resources, | 
|  | trigger_id_to_trigger_obj, std::move(on_event_callback)); | 
|  | subscription_context->privileges = privileges; | 
|  |  | 
|  | // Add subscription to subscription store. | 
|  | absl::Status new_subscription_status = | 
|  | subscription_store_->AddNewSubscription( | 
|  | std::move(subscription_context)); | 
|  |  | 
|  | if (!new_subscription_status.ok()) { | 
|  | (*on_subscribe)(new_subscription_status); | 
|  | // TODO(rahulkpr): Add logic to clean up partial subscription. | 
|  | return; | 
|  | } | 
|  |  | 
|  | // If we reach this point, we have created a subscription | 
|  | // successfully. Invoke the callback to complete the subscription | 
|  | // sequence. | 
|  | (*on_subscribe)(subscription_id); | 
|  | }; | 
|  |  | 
|  | // Now we invoke subscribe on the subscription backend. This should send | 
|  | // subscription requests to each event source which will return one or more | 
|  | // event source ids. | 
|  | auto async_subscribe_response = std::make_shared<AsyncSubscribeResponse>( | 
|  | unique_resources_to_subscribe, std::move(on_async_subscribe_complete)); | 
|  | for (const std::string &origin_resource : unique_resources_to_subscribe) { | 
|  | absl::Status status = subscription_backend_->Subscribe( | 
|  | origin_resource, | 
|  | [async_subscribe_response, origin_resource]( | 
|  | const absl::Status &status, | 
|  | const std::vector<EventSourceId> &event_source_ids) { | 
|  | async_subscribe_response->ProcessResponse(origin_resource, status, | 
|  | event_source_ids); | 
|  | }, | 
|  | peer_privileges); | 
|  | if (!status.ok()) { | 
|  | (*on_subscribe)(status); | 
|  | return; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void DeleteSubscription(const SubscriptionId &subscription_id) override { | 
|  | subscription_store_->DeleteSubscription(subscription_id); | 
|  | } | 
|  |  | 
|  | // Retrieves all subscriptions managed by the service. | 
|  | absl::Span<const SubscriptionContext> GetAllSubscriptions() override { | 
|  | LOG(ERROR) << "GetAllSubscriptions: Unimplemented!"; | 
|  | return {}; | 
|  | } | 
|  |  | 
|  | nlohmann::json GetSubscriptionsToJSON() override { | 
|  | return subscription_store_->ToJSON(); | 
|  | } | 
|  |  | 
|  | std::string GetSubscriptionsToString() override { | 
|  | return subscription_store_->ToString(); | 
|  | } | 
|  |  | 
|  | nlohmann::json GetEventsToJSON() override { | 
|  | return event_store_->ToJSON(); | 
|  | } | 
|  |  | 
|  | std::string GetEventsToString() override { | 
|  | return event_store_->ToString(); | 
|  | } | 
|  |  | 
|  | void ClearEventStore() override { | 
|  | event_store_->Clear(); | 
|  | } | 
|  |  | 
|  | nlohmann::json GetEventsBySubscriptionIdToJSON( | 
|  | size_t subscription_id) override { | 
|  | nlohmann::json json; | 
|  | // Check if the subscription_id is valid | 
|  | absl::StatusOr<const SubscriptionContext*> sc = | 
|  | subscription_store_->GetSubscription(SubscriptionId(subscription_id)); | 
|  | if (!sc.ok()) { | 
|  | json["Error"] = "Invalid subscription id"; | 
|  | return json; | 
|  | } | 
|  | json["SubscriptionId"] = subscription_id; | 
|  | json["Events"] = event_store_->GetEventsBySubscriptionId(subscription_id); | 
|  | return json; | 
|  | } | 
|  | // Invoked by Redfish event sources which are typically implementations that | 
|  | // monitor sources like DBus monitor, file i/o, socket ingress etc. | 
|  | // | 
|  | // Notify function is invoked with `event_source_id` that uniquely identifies | 
|  | // event source and `status` to indicate the subscription service of an error | 
|  | // condition at the event source which would trigger delete subscription | 
|  | // sequence. | 
|  | absl::Status Notify(EventSourceId event_source_id, | 
|  | [[maybe_unused]] const absl::Status &status) override { | 
|  | // Pull subscription context from Subscription Store | 
|  | ECCLESIA_ASSIGN_OR_RETURN( | 
|  | auto contexts, | 
|  | subscription_store_->GetSubscriptionsByEventSourceId(event_source_id)) | 
|  |  | 
|  | auto event_source_id_new = | 
|  | std::make_shared<EventSourceId>(std::move(event_source_id)); | 
|  | for (const SubscriptionContext *subscription_context : contexts) { | 
|  | // Find URIs to query. | 
|  | // A subscription context is always expected to have a set of URIs | 
|  | // associated with event source. Therefore, we don't handle the not found | 
|  | // case here. | 
|  | auto find_uris_to_query = | 
|  | subscription_context->event_source_to_uri.find(*event_source_id_new); | 
|  | auto aggregated_response = std::make_shared<AggregatedQueryResponse>( | 
|  | find_uris_to_query->second, | 
|  | [this, subscription_context, event_source_id_new]( | 
|  | const absl::flat_hash_map<std::string, nlohmann::json> | 
|  | &uri_to_response) { | 
|  | HandleEventsForSubscription(*subscription_context, | 
|  | *event_source_id_new, uri_to_response); | 
|  | }); | 
|  |  | 
|  | // Query Redfish URIs to build origin of condition. | 
|  | for (const std::string &uri : find_uris_to_query->second) { | 
|  | ECCLESIA_RETURN_IF_ERROR(subscription_backend_->Query( | 
|  | uri, | 
|  | [uri, aggregated_response]( | 
|  | const absl::Status &sc, | 
|  | const nlohmann::json &query_response) mutable { | 
|  | aggregated_response->AddQueryResponse(uri, sc, query_response); | 
|  | }, | 
|  | subscription_context->privileges)); | 
|  | } | 
|  | } | 
|  | return absl::OkStatus(); | 
|  | } | 
|  |  | 
|  | // Just like Notify() with an additional parameter |data| that represents | 
|  | // Redfish resource associated with event source. | 
|  | absl::Status NotifyWithData( | 
|  | [[maybe_unused]] EventSourceId key, [[maybe_unused]] absl::Status status, | 
|  | [[maybe_unused]] const nlohmann::json &data) override { | 
|  | return absl::UnimplementedError("NotifyWithData:: Unimplemented!"); | 
|  | } | 
|  |  | 
|  | private: | 
|  | void HandleEventsForSubscription( | 
|  | const SubscriptionContext &context, const EventSourceId &source_id, | 
|  | const absl::flat_hash_map<std::string, nlohmann::json> &uri_to_response) { | 
|  | // Create event id as a function of source id and time. | 
|  | EventId event_id(context.subscription_id, source_id, absl::Now()); | 
|  |  | 
|  | // Construct RedfishEvents for subscription. | 
|  | std::vector<nlohmann::json> events = | 
|  | BuildRedfishEventsForSubscription(context, uri_to_response); | 
|  |  | 
|  | // If there are no events to dispatch, Redfish Event response is not | 
|  | // created and we return early. | 
|  | if (events.empty()) return; | 
|  |  | 
|  | nlohmann::json redfish_event_obj; | 
|  | redfish_event_obj["Id"] = event_id.redfish_event_id; | 
|  | redfish_event_obj["@odata.type"] = "#Event.v1_7_0.Event"; | 
|  | redfish_event_obj["Name"] = "RedfishEvent"; | 
|  | redfish_event_obj["Events"] = events; | 
|  |  | 
|  | // Add constructed Redfish event to event store. | 
|  | event_store_->AddNewEvent(event_id, redfish_event_obj); | 
|  |  | 
|  | // Send event to the destination. | 
|  | context.on_event_callback(redfish_event_obj); | 
|  | } | 
|  |  | 
|  | std::unique_ptr<SubscriptionBackend> subscription_backend_; | 
|  | std::unique_ptr<SubscriptionStore> subscription_store_; | 
|  | std::unique_ptr<EventStore> event_store_; | 
|  | }; | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | std::unique_ptr<SubscriptionService> CreateSubscriptionService( | 
|  | std::unique_ptr<SubscriptionBackend> subscription_backend, | 
|  | std::unique_ptr<SubscriptionStore> subscription_store, | 
|  | std::unique_ptr<EventStore> event_store) { | 
|  | return std::make_unique<SubscriptionServiceImpl>( | 
|  | std::move(subscription_backend), std::move(subscription_store), | 
|  | std::move(event_store)); | 
|  | } | 
|  |  | 
|  | }  // namespace ecclesia |