/*
 * Copyright 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "subscription_store_impl.h"

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/span.h"
#include "subscription.h"
#include "nlohmann/json.hpp"

namespace ecclesia {

namespace {

// Concrete implementation of SubscriptionStore.
// Thread safe
class SubscriptionStoreImpl : public SubscriptionStore {
 public:
  // Adds a new subscription with the given subscription ID and event source IDs
  absl::Status AddNewSubscription(
      std::unique_ptr<SubscriptionContext> subscription_context) override {
    if (subscription_context == nullptr) {
      return absl::InvalidArgumentError("Subscription context is null.");
    }
    if (subscription_context->subscription_id.Id() <= 0) {
      return absl::InvalidArgumentError("Invalid Id, must be >0");
    }
    if (subscription_context->event_source_to_uri.empty()) {
      return absl::InvalidArgumentError("Event source to uri is empty.");
    }

    SubscriptionId subscription_id = subscription_context->subscription_id;
    const SubscriptionContext* subscription_context_raw_ptr =
        subscription_context.get();

    // Insert a subscription id if an equivalent key does not
    // already exist within the map.
    {
      absl::MutexLock lock(&subscriptions_mutex_);
      auto [it, inserted] = subscriptions_.insert(
          {subscription_id, std::move(subscription_context)});

      // subscription_context cannot be used past this point.
      if (!inserted) {
        return absl::AlreadyExistsError(absl::StrCat(
            "Subscription Id ", subscription_id.Id(), " already exists."));
      }
    }

    std::vector<EventSourceId> event_source_ids_to_add;
    for (const auto& [event_source_id, value] :
         subscription_context_raw_ptr->event_source_to_uri) {
      event_source_ids_to_add.push_back(event_source_id);
      absl::MutexLock lock(&subscriptions_by_event_sources_mutex_);
      auto iter = subscriptions_by_event_sources_.find(event_source_id);
      if (iter == subscriptions_by_event_sources_.end()) {
        subscriptions_by_event_sources_.insert(
            {event_source_id, {subscription_context_raw_ptr}});
      } else {
        iter->second.push_back(subscription_context_raw_ptr);
      }
    }

    {
      absl::MutexLock lock(&event_sources_by_subscription_mutex_);
      event_sources_by_subscription_.insert(
          {subscription_context_raw_ptr->subscription_id,
           std::move(event_source_ids_to_add)});
    }
    return absl::OkStatus();
  }

  // Deletes the subscription with the given subscription ID
  void DeleteSubscription(const SubscriptionId& subscription_id) override {
    {
      absl::MutexLock lock(&event_sources_by_subscription_mutex_);
      auto it = event_sources_by_subscription_.find(subscription_id);
      if (it != event_sources_by_subscription_.end()) {
        std::vector<EventSourceId>& event_source_ids = it->second;
        for (auto& event_source_id : event_source_ids) {
          DeleteSubscriptionFromEventSources(event_source_id, subscription_id);
        }
        event_sources_by_subscription_.erase(subscription_id);
      }
    }
    // Must be the last one to cleanup. The Other container has raw pointers to
    // the uniqe_ptrs in this container.
    absl::MutexLock lock(&subscriptions_mutex_);
    subscriptions_.erase(subscription_id);
  }

  absl::StatusOr<const SubscriptionContext*> GetSubscription(
      const SubscriptionId& subscription_id) override {
    absl::MutexLock lock(&subscriptions_mutex_);
    auto it = subscriptions_.find(subscription_id);
    if (it == subscriptions_.end()) {
      return absl::NotFoundError(absl::StrCat(
          "Subscription with ID ", subscription_id.Id(), " not found."));
    }
    return it->second.get();
  }

  // Retrieves the subscriptions associated with the given event source ID
  absl::StatusOr<absl::Span<const ecclesia::SubscriptionContext* const>>
  GetSubscriptionsByEventSourceId(const EventSourceId& source_id) override {
    absl::MutexLock lock(&subscriptions_by_event_sources_mutex_);
    auto it = subscriptions_by_event_sources_.find(source_id);
    if (it == subscriptions_by_event_sources_.end()) {
      return absl::NotFoundError(absl::StrCat(
          "Event source with ID ", source_id.ToString(), " not found."));
    }
    return it->second;
  }

  // Converts SubscriptionStore to JSON format
  nlohmann::json ToJSON() override {
    nlohmann::json json;
    nlohmann::json& subscriptions_json = json["Subscriptions"];
    subscriptions_json = nlohmann::json::array();
    absl::MutexLock lock(&subscriptions_mutex_);
    for (const auto& key_value_pair : subscriptions_) {
      nlohmann::json subscription_json;
      subscription_json["SubscriptionId"] = key_value_pair.first.Id();
      nlohmann::json& uri_json = subscription_json["URI"];
      uri_json = nlohmann::json::array();
      for (const auto& [event_source_id, set_of_uris] :
          key_value_pair.second->event_source_to_uri) {
        for (const auto& uri : set_of_uris) {
          uri_json.push_back(uri);
        }
      }
      nlohmann::json& trigger_json = subscription_json["Triggers"];
      trigger_json = nlohmann::json::array();
      for (const auto& [trigger_id, trigger] :
          key_value_pair.second->id_to_triggers) {
          trigger_json.push_back(trigger.ToJSON());
      }
      subscriptions_json.push_back(subscription_json);
    }
    json["NumSubscriptions"] = subscriptions_json.size();
    return json;
  }

  // Converts SubscriptionStore to string format
  std::string ToString() override { return ToJSON().dump(); }

 private:
  void DeleteSubscriptionFromEventSources(
      const EventSourceId& event_source_id,
      const SubscriptionId& subscription_id) {
    absl::MutexLock lock(&subscriptions_by_event_sources_mutex_);
    std::vector<const SubscriptionContext*>& subscription_contexts =
        subscriptions_by_event_sources_[event_source_id];
    for (auto iter = subscription_contexts.begin();
         iter != subscription_contexts.end(); ++iter) {
      if ((*iter)->subscription_id != subscription_id) {
        continue;
      }
      subscription_contexts.erase(iter);
      if (subscription_contexts.empty()) {
        subscriptions_by_event_sources_.erase(event_source_id);
      }
      return;
    }
  }

  absl::Mutex subscriptions_mutex_;
  // pointer-stability of values (but not keys) is needed
  absl::flat_hash_map<SubscriptionId, std::unique_ptr<SubscriptionContext>>
      subscriptions_ ABSL_GUARDED_BY(subscriptions_mutex_);

  absl::Mutex subscriptions_by_event_sources_mutex_;
  absl::flat_hash_map<EventSourceId, std::vector<const SubscriptionContext*>>
      subscriptions_by_event_sources_
          ABSL_GUARDED_BY(subscriptions_by_event_sources_mutex_);

  absl::Mutex event_sources_by_subscription_mutex_;
  absl::flat_hash_map<SubscriptionId, std::vector<EventSourceId>>
      event_sources_by_subscription_
          ABSL_GUARDED_BY(event_sources_by_subscription_mutex_);
};

}  // namespace

std::unique_ptr<SubscriptionStore> CreateSubscriptionStore() {
  return std::make_unique<SubscriptionStoreImpl>();
}

}  // namespace ecclesia
