blob: 5ec0c286cdc1da843d8d000d0079abd30f7beef2 [file] [log] [blame]
#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_
#include <cstddef>
#include <cstdint>
#include <memory>
#include <queue>
#include <string>
#include <string_view>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/synchronization/mutex.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/server_callback.h"
#include "grpcpp/support/status.h"
#include "hft_service.grpc.pb.h"
#include "tlbmc/subscription/manager.h"
#include "zatar/bmcweb_cert_provider.h"
namespace milotic_hft {
#ifdef INTERNAL_GRPC_GEN
using HftService = ::milotic_hft::grpc_gen::HftService;
#else
using HftService = ::milotic_hft::HftService;
#endif
namespace internal {
// The implementation of the server side reactor.
// This class is thread-safe.
class ServerReactorImpl : public grpc::ServerWriteReactor<HftResponse> {
public:
// The status of the reactor (also the stream).
enum class Status : char {
kIdle, // The stream is idle, no write is in the flight.
kWriteInFlight, // There is a write in flight.
kFinishCalled, // The stream will be terminated soon at any given time as
// `Finish` has been called. A stream can fall into this
// state because of "previous write failed" or "client
// cancelled the stream". The stream is waiting for OnDone
// to be called to declare the stream fully finished.
kFinished, // The stream has been terminated. The `reactor` pointer might
// be still valid but will be released at any given time.
};
static constexpr std::string_view StatusToString(Status status) {
switch (status) {
case Status::kIdle:
return "kIdle";
case Status::kWriteInFlight:
return "kWriteInFlight";
case Status::kFinishCalled:
return "kFinishCalled";
case Status::kFinished:
return "kFinished";
}
return "";
}
// Constructor.
// `on_done` will be called when the stream is finished.
// `maximum_event_queue_size` is the maximum number of events that can be
// queued before new events are dropped.
ServerReactorImpl(absl::AnyInvocable<void(ServerReactorImpl *)> on_done,
std::size_t maximum_event_queue_size);
// Public interfaces for services to manage the reactor below
// Finishes the stream with the given status.
// If already finished or finishing, this function is a no-op.
void SafeFinish(const grpc::Status &status) ABSL_LOCKS_EXCLUDED(mutex_);
// Adds a response to the response queue. If the queue is full, the response
// will be dropped.
// If the stream is idle, a new write will be started.
// Returns true if the response is added successfully.
bool AddResponse(HftResponse &&response) ABSL_LOCKS_EXCLUDED(mutex_);
ServerReactorImpl::Status GetStatus() const ABSL_LOCKS_EXCLUDED(mutex_);
// Reactor interfaces below
void OnWriteDone(bool ok) override;
void OnCancel() override;
void OnDone() override;
protected:
absl::AnyInvocable<void(ServerReactorImpl *)> on_done_;
const std::size_t maximum_event_queue_size_;
mutable absl::Mutex mutex_;
Status status_ ABSL_GUARDED_BY(mutex_) = Status::kIdle;
std::queue<HftResponse> response_queue_ ABSL_GUARDED_BY(mutex_);
};
} // namespace internal
struct HftServiceOptions {
std::size_t maximum_event_queue_size = 1000;
bool enable_authorization = true;
::milotic::redfish::BmcWebCertProvider *cert_provider = nullptr;
};
class HftServiceImpl : public HftService::CallbackService {
public:
explicit HftServiceImpl(
const HftServiceOptions &options,
std::unique_ptr<SubscriptionManager> subscription_manager);
grpc::ServerWriteReactor<HftResponse> *Subscribe(
grpc::CallbackServerContext *context, const HftRequest *request) override;
// An interface for exposing the current accumulative sample rate for
// a given role. Mostly for testing and debugging.
uint64_t GetAccumulativeSampleRate(const std::string &role) const
ABSL_LOCKS_EXCLUDED(mutex_);
// An interface for exposing the number of reactors. Mostly for testing and
// debugging.
uint64_t GetReactorsCount() const ABSL_LOCKS_EXCLUDED(mutex_);
// An interface for exposing the number of reactors that should decrease the
// sample rate when finished. Mostly for testing and debugging.
// A reactor might not decrease the sample rate, e.g., on authorization
// failures.
uint64_t GetDecreaseSampleRateWhenFinishedCount() const
ABSL_LOCKS_EXCLUDED(mutex_);
// An interface for exposing the number of roles that have a sample rate.
// Mostly for testing and debugging.
uint64_t GetRoleToTotalSampleRateCount() const ABSL_LOCKS_EXCLUDED(mutex_);
// An interface for exposing the number of subscription IDs for each reactor.
// Mostly for testing and debugging.
uint64_t GetReactorToSubscriptionIdsCount() const ABSL_LOCKS_EXCLUDED(mutex_);
protected:
grpc::ServerWriteReactor<HftResponse> *SubscribeWithStatus(
const HftRequest *request, const std::string &role,
grpc::Status authz_status);
const HftServiceOptions options_;
std::unique_ptr<SubscriptionManager> subscription_manager_;
mutable absl::Mutex mutex_;
absl::flat_hash_map<internal::ServerReactorImpl *,
std::shared_ptr<internal::ServerReactorImpl>>
reactor_map_ ABSL_GUARDED_BY(mutex_);
absl::flat_hash_map<std::string, uint64_t> role_to_total_sample_rate_
ABSL_GUARDED_BY(mutex_);
absl::flat_hash_set<internal::ServerReactorImpl *>
reactor_should_decrease_sample_rate_when_finished_
ABSL_GUARDED_BY(mutex_);
absl::flat_hash_map<internal::ServerReactorImpl *,
std::vector<SubscriptionManager::SubscriptionId>>
reactor_to_subscription_ids_ ABSL_GUARDED_BY(mutex_);
};
} // namespace milotic_hft
#endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SERVICE_HFT_SERVICE_H_