#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_

#include <cstddef>
#include <cstdint>
#include <memory>
#include <queue>
#include <string>
#include <string_view>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/synchronization/mutex.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/server_callback.h"
#include "grpcpp/support/status.h"
#include "nlohmann/json_fwd.hpp"
#include "hft_service.grpc.pb.h"
#include "tlbmc/subscription/manager.h"
#include "zatar/bmcweb_cert_provider.h"

namespace milotic_hft {

#ifdef INTERNAL_GRPC_GEN
using HftService = ::milotic_hft::grpc_gen::HftService;
#else
using HftService = ::milotic_hft::HftService;
#endif

namespace internal {

// The implementation of the server side reactor.
// This class is thread-safe.
class ServerReactorImpl : public grpc::ServerWriteReactor<HftResponse> {
 public:
  // The status of the reactor (also the stream).
  enum class Status : char {
    kIdle,           // The stream is idle, no write is in the flight.
    kWriteInFlight,  // There is a write in flight.
    kFinishCalled,   // The stream will be terminated soon at any given time as
                     // `Finish` has been called. A stream can fall into this
                     // state because of "previous write failed" or "client
                     // cancelled the stream". The stream is waiting for OnDone
                     // to be called to declare the stream fully finished.
    kFinished,  // The stream has been terminated. The `reactor` pointer might
                // be still valid but will be released at any given time.
  };
  static constexpr std::string_view StatusToString(Status status) {
    switch (status) {
      case Status::kIdle:
        return "kIdle";
      case Status::kWriteInFlight:
        return "kWriteInFlight";
      case Status::kFinishCalled:
        return "kFinishCalled";
      case Status::kFinished:
        return "kFinished";
    }
    return "";
  }

  // Constructor.
  // `on_done` will be called when the stream is finished.
  // `maximum_event_queue_size` is the maximum number of events that can be
  // queued before new events are dropped.
  ServerReactorImpl(absl::AnyInvocable<void(ServerReactorImpl *)> on_done,
                    std::size_t maximum_event_queue_size);

  // Public interfaces for services to manage the reactor below

  // Finishes the stream with the given status.
  // If already finished or finishing, this function is a no-op.
  void SafeFinish(const grpc::Status &status) ABSL_LOCKS_EXCLUDED(mutex_);

  // Adds a response to the response queue. If the queue is full, the response
  // will be dropped.
  // If the stream is idle, a new write will be started.
  // Returns true if the response is added successfully.
  bool AddResponse(HftResponse &&response) ABSL_LOCKS_EXCLUDED(mutex_);

  ServerReactorImpl::Status GetStatus() const ABSL_LOCKS_EXCLUDED(mutex_);

  // Reactor interfaces below
  void OnWriteDone(bool ok) override;
  void OnCancel() override;
  void OnDone() override;

 protected:
  absl::AnyInvocable<void(ServerReactorImpl *)> on_done_;
  const std::size_t maximum_event_queue_size_;

  mutable absl::Mutex mutex_;
  Status status_ ABSL_GUARDED_BY(mutex_) = Status::kIdle;
  std::queue<HftResponse> response_queue_ ABSL_GUARDED_BY(mutex_);
};

}  // namespace internal

struct HftServiceOptions {
  std::size_t maximum_event_queue_size = 1000;
  bool enable_authorization = true;
  ::milotic::redfish::BmcWebCertProvider *cert_provider = nullptr;
};

class HftServiceImpl : public HftService::CallbackService {
 public:
  explicit HftServiceImpl(
      const HftServiceOptions &options,
      std::unique_ptr<SubscriptionManager> subscription_manager);

  grpc::ServerWriteReactor<HftResponse> *Subscribe(
      grpc::CallbackServerContext *context, const HftRequest *request) override;

  // An interface for exposing the current accumulative sample rate for
  // a given role. Mostly for testing and debugging.
  uint64_t GetAccumulativeSampleRate(const std::string &role) const
      ABSL_LOCKS_EXCLUDED(mutex_);

  // An interface for exposing the number of reactors. Mostly for testing and
  // debugging.
  uint64_t GetReactorsCount() const ABSL_LOCKS_EXCLUDED(mutex_);

  // An interface for exposing the number of reactors that should decrease the
  // sample rate when finished. Mostly for testing and debugging.
  // A reactor might not decrease the sample rate, e.g., on authorization
  // failures.
  uint64_t GetDecreaseSampleRateWhenFinishedCount() const
      ABSL_LOCKS_EXCLUDED(mutex_);

  // An interface for exposing the number of roles that have a sample rate.
  // Mostly for testing and debugging.
  uint64_t GetRoleToTotalSampleRateCount() const ABSL_LOCKS_EXCLUDED(mutex_);

  // An interface for exposing the number of subscription IDs for each reactor.
  // Mostly for testing and debugging.
  uint64_t GetReactorToSubscriptionIdsCount() const ABSL_LOCKS_EXCLUDED(mutex_);

  nlohmann::json ToJson() const ABSL_LOCKS_EXCLUDED(mutex_);

 protected:
  grpc::ServerWriteReactor<HftResponse> *SubscribeWithStatus(
      const HftRequest *request, const std::string &role,
      grpc::Status authz_status);

  const HftServiceOptions options_;
  std::unique_ptr<SubscriptionManager> subscription_manager_;

  mutable absl::Mutex mutex_;
  absl::flat_hash_map<internal::ServerReactorImpl *,
                      std::shared_ptr<internal::ServerReactorImpl>>
      reactor_map_ ABSL_GUARDED_BY(mutex_);
  absl::flat_hash_map<std::string, uint64_t> role_to_total_sample_rate_
      ABSL_GUARDED_BY(mutex_);
  absl::flat_hash_set<internal::ServerReactorImpl *>
      reactor_should_decrease_sample_rate_when_finished_
          ABSL_GUARDED_BY(mutex_);
  absl::flat_hash_map<internal::ServerReactorImpl *,
                      std::vector<SubscriptionManager::SubscriptionId>>
      reactor_to_subscription_ids_ ABSL_GUARDED_BY(mutex_);
};

}  // namespace milotic_hft

#endif  // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_
