| #ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_ |
| #define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_ |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <queue> |
| #include <string> |
| #include <string_view> |
| #include <vector> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/container/flat_hash_set.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/synchronization/mutex.h" |
| #include "grpcpp/server_context.h" |
| #include "grpcpp/support/server_callback.h" |
| #include "grpcpp/support/status.h" |
| #include "hft_service.grpc.pb.h" |
| #include "tlbmc/subscription/manager.h" |
| #include "zatar/bmcweb_cert_provider.h" |
| |
| namespace milotic_hft { |
| |
| #ifdef INTERNAL_GRPC_GEN |
| using HftService = ::milotic_hft::grpc_gen::HftService; |
| #else |
| using HftService = ::milotic_hft::HftService; |
| #endif |
| |
| namespace internal { |
| |
| // The implementation of the server side reactor. |
| // This class is thread-safe. |
| class ServerReactorImpl : public grpc::ServerWriteReactor<HftResponse> { |
| public: |
| // The status of the reactor (also the stream). |
| enum class Status : char { |
| kIdle, // The stream is idle, no write is in the flight. |
| kWriteInFlight, // There is a write in flight. |
| kFinishCalled, // The stream will be terminated soon at any given time as |
| // `Finish` has been called. A stream can fall into this |
| // state because of "previous write failed" or "client |
| // cancelled the stream". The stream is waiting for OnDone |
| // to be called to declare the stream fully finished. |
| kFinished, // The stream has been terminated. The `reactor` pointer might |
| // be still valid but will be released at any given time. |
| }; |
| static constexpr std::string_view StatusToString(Status status) { |
| switch (status) { |
| case Status::kIdle: |
| return "kIdle"; |
| case Status::kWriteInFlight: |
| return "kWriteInFlight"; |
| case Status::kFinishCalled: |
| return "kFinishCalled"; |
| case Status::kFinished: |
| return "kFinished"; |
| } |
| return ""; |
| } |
| |
| // Constructor. |
| // `on_done` will be called when the stream is finished. |
| // `maximum_event_queue_size` is the maximum number of events that can be |
| // queued before new events are dropped. |
| ServerReactorImpl(absl::AnyInvocable<void(ServerReactorImpl *)> on_done, |
| std::size_t maximum_event_queue_size); |
| |
| // Public interfaces for services to manage the reactor below |
| |
| // Finishes the stream with the given status. |
| // If already finished or finishing, this function is a no-op. |
| void SafeFinish(const grpc::Status &status) ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| // Adds a response to the response queue. If the queue is full, the response |
| // will be dropped. |
| // If the stream is idle, a new write will be started. |
| // Returns true if the response is added successfully. |
| bool AddResponse(HftResponse &&response) ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| ServerReactorImpl::Status GetStatus() const ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| // Reactor interfaces below |
| void OnWriteDone(bool ok) override; |
| void OnCancel() override; |
| void OnDone() override; |
| |
| protected: |
| absl::AnyInvocable<void(ServerReactorImpl *)> on_done_; |
| const std::size_t maximum_event_queue_size_; |
| |
| mutable absl::Mutex mutex_; |
| Status status_ ABSL_GUARDED_BY(mutex_) = Status::kIdle; |
| std::queue<HftResponse> response_queue_ ABSL_GUARDED_BY(mutex_); |
| }; |
| |
| } // namespace internal |
| |
| struct HftServiceOptions { |
| std::size_t maximum_event_queue_size = 1000; |
| bool enable_authorization = true; |
| ::milotic::redfish::BmcWebCertProvider *cert_provider = nullptr; |
| }; |
| |
| class HftServiceImpl : public HftService::CallbackService { |
| public: |
| explicit HftServiceImpl( |
| const HftServiceOptions &options, |
| std::unique_ptr<SubscriptionManager> subscription_manager); |
| |
| grpc::ServerWriteReactor<HftResponse> *Subscribe( |
| grpc::CallbackServerContext *context, const HftRequest *request) override; |
| |
| // An interface for exposing the current accumulative sample rate for |
| // a given role. Mostly for testing and debugging. |
| uint64_t GetAccumulativeSampleRate(const std::string &role) const |
| ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| // An interface for exposing the number of reactors. Mostly for testing and |
| // debugging. |
| uint64_t GetReactorsCount() const ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| // An interface for exposing the number of reactors that should decrease the |
| // sample rate when finished. Mostly for testing and debugging. |
| // A reactor might not decrease the sample rate, e.g., on authorization |
| // failures. |
| uint64_t GetDecreaseSampleRateWhenFinishedCount() const |
| ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| // An interface for exposing the number of roles that have a sample rate. |
| // Mostly for testing and debugging. |
| uint64_t GetRoleToTotalSampleRateCount() const ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| // An interface for exposing the number of subscription IDs for each reactor. |
| // Mostly for testing and debugging. |
| uint64_t GetReactorToSubscriptionIdsCount() const ABSL_LOCKS_EXCLUDED(mutex_); |
| |
| protected: |
| grpc::ServerWriteReactor<HftResponse> *SubscribeWithStatus( |
| const HftRequest *request, const std::string &role, |
| grpc::Status authz_status); |
| |
| const HftServiceOptions options_; |
| std::unique_ptr<SubscriptionManager> subscription_manager_; |
| |
| mutable absl::Mutex mutex_; |
| absl::flat_hash_map<internal::ServerReactorImpl *, |
| std::shared_ptr<internal::ServerReactorImpl>> |
| reactor_map_ ABSL_GUARDED_BY(mutex_); |
| absl::flat_hash_map<std::string, uint64_t> role_to_total_sample_rate_ |
| ABSL_GUARDED_BY(mutex_); |
| absl::flat_hash_set<internal::ServerReactorImpl *> |
| reactor_should_decrease_sample_rate_when_finished_ |
| ABSL_GUARDED_BY(mutex_); |
| absl::flat_hash_map<internal::ServerReactorImpl *, |
| std::vector<SubscriptionManager::SubscriptionId>> |
| reactor_to_subscription_ids_ ABSL_GUARDED_BY(mutex_); |
| }; |
| |
| } // namespace milotic_hft |
| |
| #endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_ |