|  | /* | 
|  | * Copyright 2024 Google LLC | 
|  | * | 
|  | * Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | * you may not use this file except in compliance with the License. | 
|  | * You may obtain a copy of the License at | 
|  | * | 
|  | *      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | #include "subscription_store_impl.h" | 
|  |  | 
|  | #include <memory> | 
|  | #include <string> | 
|  | #include <utility> | 
|  | #include <vector> | 
|  |  | 
|  | #include "absl/base/thread_annotations.h" | 
|  | #include "absl/container/flat_hash_map.h" | 
|  | #include "absl/status/status.h" | 
|  | #include "absl/status/statusor.h" | 
|  | #include "absl/strings/str_cat.h" | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "absl/synchronization/mutex.h" | 
|  | #include "absl/types/span.h" | 
|  | #include "subscription.h" | 
|  | #include "nlohmann/json.hpp" | 
|  |  | 
|  | namespace ecclesia { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | // Concrete implementation of SubscriptionStore. | 
|  | // Thread safe | 
|  | class SubscriptionStoreImpl : public SubscriptionStore { | 
|  | public: | 
|  | // Adds a new subscription with the given subscription ID and event source IDs | 
|  | absl::Status AddNewSubscription( | 
|  | std::unique_ptr<SubscriptionContext> subscription_context) override { | 
|  | if (subscription_context == nullptr) { | 
|  | return absl::InvalidArgumentError("Subscription context is null."); | 
|  | } | 
|  | if (subscription_context->subscription_id.Id() <= 0) { | 
|  | return absl::InvalidArgumentError("Invalid Id, must be >0"); | 
|  | } | 
|  | if (subscription_context->event_source_to_uri.empty()) { | 
|  | return absl::InvalidArgumentError("Event source to uri is empty."); | 
|  | } | 
|  |  | 
|  | SubscriptionId subscription_id = subscription_context->subscription_id; | 
|  | const SubscriptionContext* subscription_context_raw_ptr = | 
|  | subscription_context.get(); | 
|  |  | 
|  | // Insert a subscription id if an equivalent key does not | 
|  | // already exist within the map. | 
|  | { | 
|  | absl::MutexLock lock(&subscriptions_mutex_); | 
|  | auto [it, inserted] = subscriptions_.insert( | 
|  | {subscription_id, std::move(subscription_context)}); | 
|  |  | 
|  | // subscription_context cannot be used past this point. | 
|  | if (!inserted) { | 
|  | return absl::AlreadyExistsError(absl::StrCat( | 
|  | "Subscription Id ", subscription_id.Id(), " already exists.")); | 
|  | } | 
|  | } | 
|  |  | 
|  | std::vector<EventSourceId> event_source_ids_to_add; | 
|  | for (const auto& [event_source_id, value] : | 
|  | subscription_context_raw_ptr->event_source_to_uri) { | 
|  | event_source_ids_to_add.push_back(event_source_id); | 
|  | absl::MutexLock lock(&subscriptions_by_event_sources_mutex_); | 
|  | auto iter = subscriptions_by_event_sources_.find(event_source_id); | 
|  | if (iter == subscriptions_by_event_sources_.end()) { | 
|  | subscriptions_by_event_sources_.insert( | 
|  | {event_source_id, {subscription_context_raw_ptr}}); | 
|  | } else { | 
|  | iter->second.push_back(subscription_context_raw_ptr); | 
|  | } | 
|  | } | 
|  |  | 
|  | { | 
|  | absl::MutexLock lock(&event_sources_by_subscription_mutex_); | 
|  | event_sources_by_subscription_.insert( | 
|  | {subscription_context_raw_ptr->subscription_id, | 
|  | std::move(event_source_ids_to_add)}); | 
|  | } | 
|  | return absl::OkStatus(); | 
|  | } | 
|  |  | 
|  | // Deletes the subscription with the given subscription ID | 
|  | void DeleteSubscription(const SubscriptionId& subscription_id) override { | 
|  | { | 
|  | absl::MutexLock lock(&event_sources_by_subscription_mutex_); | 
|  | auto it = event_sources_by_subscription_.find(subscription_id); | 
|  | if (it != event_sources_by_subscription_.end()) { | 
|  | std::vector<EventSourceId>& event_source_ids = it->second; | 
|  | for (auto& event_source_id : event_source_ids) { | 
|  | DeleteSubscriptionFromEventSources(event_source_id, subscription_id); | 
|  | } | 
|  | event_sources_by_subscription_.erase(subscription_id); | 
|  | } | 
|  | } | 
|  | // Must be the last one to cleanup. The Other container has raw pointers to | 
|  | // the uniqe_ptrs in this container. | 
|  | absl::MutexLock lock(&subscriptions_mutex_); | 
|  | subscriptions_.erase(subscription_id); | 
|  | } | 
|  |  | 
|  | absl::StatusOr<const SubscriptionContext*> GetSubscription( | 
|  | const SubscriptionId& subscription_id) override { | 
|  | absl::MutexLock lock(&subscriptions_mutex_); | 
|  | auto it = subscriptions_.find(subscription_id); | 
|  | if (it == subscriptions_.end()) { | 
|  | return absl::NotFoundError(absl::StrCat( | 
|  | "Subscription with ID ", subscription_id.Id(), " not found.")); | 
|  | } | 
|  | return it->second.get(); | 
|  | } | 
|  |  | 
|  | // Retrieves the subscriptions associated with the given event source ID | 
|  | absl::StatusOr<absl::Span<const ecclesia::SubscriptionContext* const>> | 
|  | GetSubscriptionsByEventSourceId(const EventSourceId& source_id) override { | 
|  | absl::MutexLock lock(&subscriptions_by_event_sources_mutex_); | 
|  | auto it = subscriptions_by_event_sources_.find(source_id); | 
|  | if (it == subscriptions_by_event_sources_.end()) { | 
|  | return absl::NotFoundError(absl::StrCat( | 
|  | "Event source with ID ", source_id.ToString(), " not found.")); | 
|  | } | 
|  | return it->second; | 
|  | } | 
|  |  | 
|  | // Converts SubscriptionStore to JSON format | 
|  | nlohmann::json ToJSON() override { | 
|  | nlohmann::json json; | 
|  | nlohmann::json& subscriptions_json = json["Subscriptions"]; | 
|  | subscriptions_json = nlohmann::json::array(); | 
|  | absl::MutexLock lock(&subscriptions_mutex_); | 
|  | for (const auto& key_value_pair : subscriptions_) { | 
|  | nlohmann::json subscription_json; | 
|  | subscription_json["SubscriptionId"] = key_value_pair.first.Id(); | 
|  | nlohmann::json& uri_json = subscription_json["URI"]; | 
|  | uri_json = nlohmann::json::array(); | 
|  | for (const auto& [event_source_id, set_of_uris] : | 
|  | key_value_pair.second->event_source_to_uri) { | 
|  | for (const auto& uri : set_of_uris) { | 
|  | uri_json.push_back(uri); | 
|  | } | 
|  | } | 
|  | nlohmann::json& trigger_json = subscription_json["Triggers"]; | 
|  | trigger_json = nlohmann::json::array(); | 
|  | for (const auto& [trigger_id, trigger] : | 
|  | key_value_pair.second->id_to_triggers) { | 
|  | trigger_json.push_back(trigger.ToJSON()); | 
|  | } | 
|  | subscriptions_json.push_back(subscription_json); | 
|  | } | 
|  | json["NumSubscriptions"] = subscriptions_json.size(); | 
|  | return json; | 
|  | } | 
|  |  | 
|  | // Converts SubscriptionStore to string format | 
|  | std::string ToString() override { return ToJSON().dump(); } | 
|  |  | 
|  | private: | 
|  | void DeleteSubscriptionFromEventSources( | 
|  | const EventSourceId& event_source_id, | 
|  | const SubscriptionId& subscription_id) { | 
|  | absl::MutexLock lock(&subscriptions_by_event_sources_mutex_); | 
|  | std::vector<const SubscriptionContext*>& subscription_contexts = | 
|  | subscriptions_by_event_sources_[event_source_id]; | 
|  | for (auto iter = subscription_contexts.begin(); | 
|  | iter != subscription_contexts.end(); ++iter) { | 
|  | if ((*iter)->subscription_id != subscription_id) { | 
|  | continue; | 
|  | } | 
|  | subscription_contexts.erase(iter); | 
|  | if (subscription_contexts.empty()) { | 
|  | subscriptions_by_event_sources_.erase(event_source_id); | 
|  | } | 
|  | return; | 
|  | } | 
|  | } | 
|  |  | 
|  | absl::Mutex subscriptions_mutex_; | 
|  | // pointer-stability of values (but not keys) is needed | 
|  | absl::flat_hash_map<SubscriptionId, std::unique_ptr<SubscriptionContext>> | 
|  | subscriptions_ ABSL_GUARDED_BY(subscriptions_mutex_); | 
|  |  | 
|  | absl::Mutex subscriptions_by_event_sources_mutex_; | 
|  | absl::flat_hash_map<EventSourceId, std::vector<const SubscriptionContext*>> | 
|  | subscriptions_by_event_sources_ | 
|  | ABSL_GUARDED_BY(subscriptions_by_event_sources_mutex_); | 
|  |  | 
|  | absl::Mutex event_sources_by_subscription_mutex_; | 
|  | absl::flat_hash_map<SubscriptionId, std::vector<EventSourceId>> | 
|  | event_sources_by_subscription_ | 
|  | ABSL_GUARDED_BY(event_sources_by_subscription_mutex_); | 
|  | }; | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | std::unique_ptr<SubscriptionStore> CreateSubscriptionStore() { | 
|  | return std::make_unique<SubscriptionStoreImpl>(); | 
|  | } | 
|  |  | 
|  | }  // namespace ecclesia |