|  | #ifndef PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_ | 
|  | #define PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_ | 
|  |  | 
|  | #include <memory> | 
|  | #include <queue> | 
|  |  | 
|  | #include "daemon_context.h" | 
|  | #include "disruption_manager.h" | 
|  | #include "state_updater.h" | 
|  | #include "absl/base/nullability.h" | 
|  | #include "absl/base/thread_annotations.h" | 
|  | #include "absl/log/check.h" | 
|  | #include "absl/log/log.h" | 
|  | #include "absl/synchronization/mutex.h" | 
|  | #include "grpcpp/impl/call_op_set.h" | 
|  | #include "grpcpp/support/server_callback.h" | 
|  | #include "grpcpp/support/status.h" | 
|  |  | 
|  | namespace safepower_agent { | 
|  | template <typename StateProto> | 
|  | class StateChangeWriteReactor : public grpc::ServerWriteReactor<StateProto> { | 
|  | class UpdateListener : public StateUpdater<StateProto>::Listener { | 
|  | public: | 
|  | explicit UpdateListener(StateChangeWriteReactor<StateProto>& reactor) | 
|  | : reactor_(reactor) {} | 
|  | void UpdateState(const StateProto& /*current_state*/, | 
|  | const StateProto& update) override { | 
|  | reactor_.PushUpdate(update); | 
|  | } | 
|  | void Done() override { reactor_.Stop(); } | 
|  |  | 
|  | private: | 
|  | StateChangeWriteReactor<StateProto>& reactor_; | 
|  | }; | 
|  | friend class UpdateListener; | 
|  |  | 
|  | using grpc::ServerWriteReactor<StateProto>::StartWrite; | 
|  | using grpc::ServerWriteReactor<StateProto>::StartWriteLast; | 
|  |  | 
|  | public: | 
|  | StateChangeWriteReactor() : listener_(*this) {}; | 
|  |  | 
|  | using grpc::ServerWriteReactor<StateProto>::Finish; | 
|  |  | 
|  | void OnDone() override { self_.reset(); } | 
|  | void OnCancel() override { | 
|  | LOG(INFO) << "Stream cancelled"; | 
|  | Stop(/*wait=*/false); | 
|  | } | 
|  |  | 
|  | void OnWriteDone(bool ok) ABSL_LOCKS_EXCLUDED(mutex_) override { | 
|  | if (!ok) { | 
|  | LOG(ERROR) << "Write closed"; | 
|  | Stop(); | 
|  | Finish(grpc::Status(grpc::StatusCode::CANCELLED, "Write closed")); | 
|  | return; | 
|  | } | 
|  | absl::MutexLock lock(&mutex_); | 
|  | CHECK(!state_queue_.empty()); | 
|  | state_queue_.pop(); | 
|  | if (state_queue_.empty()) { | 
|  | if (stopped_) { | 
|  | LOG(INFO) << "Stream stopped"; | 
|  | Finish(grpc::Status::OK); | 
|  | } | 
|  | return; | 
|  | } | 
|  | StartWrite(&state_queue_.front(), grpc::WriteOptions()); | 
|  | } | 
|  |  | 
|  | void Connect(const | 
|  | std::shared_ptr<StateUpdater<StateProto>>& updater) | 
|  | ABSL_LOCKS_EXCLUDED(mutex_) { | 
|  | listener_.Listen(updater); | 
|  | disruption_callback_handle_ = | 
|  | DaemonContext::Get().disruption_manager().OnDisruptionStart([this] { | 
|  | LOG(INFO) << "Stopping stream because disruption is expected"; | 
|  | Stop(/*wait=*/true); | 
|  | }); | 
|  | } | 
|  |  | 
|  | static StateChangeWriteReactor*  Detach( | 
|  | std::unique_ptr<StateChangeWriteReactor<StateProto>> | 
|  | reactor) { | 
|  | StateChangeWriteReactor*  ptr = reactor.get(); | 
|  | if (ptr == nullptr) return nullptr; | 
|  | ptr->self_ = std::move(reactor); | 
|  | return ptr; | 
|  | } | 
|  |  | 
|  | bool stopped() const ABSL_LOCKS_EXCLUDED(mutex_) { | 
|  | absl::MutexLock lock(&mutex_); | 
|  | return stopped_; | 
|  | } | 
|  |  | 
|  | private: | 
|  | void Stop(bool wait = false) ABSL_LOCKS_EXCLUDED(mutex_) { | 
|  | absl::MutexLock lock(&mutex_); | 
|  | if (!stopped_) { | 
|  | stopped_ = true; | 
|  | if (state_queue_.empty()) { | 
|  | LOG(INFO) << "Stream stopped"; | 
|  | Finish(grpc::Status::OK); | 
|  | } | 
|  | } | 
|  | if (!wait) return; | 
|  | mutex_.Await(absl::Condition( | 
|  | +[](std::queue<StateProto>* state_queue) { | 
|  | return state_queue->empty(); | 
|  | }, | 
|  | &state_queue_)); | 
|  | } | 
|  |  | 
|  | void PushUpdate(const StateProto& update) ABSL_LOCKS_EXCLUDED(mutex_) { | 
|  | absl::MutexLock lock(&mutex_); | 
|  | if (stopped_) { | 
|  | LOG(WARNING) << "Dropping update because the stream is stopped"; | 
|  | return; | 
|  | } | 
|  | state_queue_.push(update); | 
|  | // If this is not the first item, OnWriteDone will start writing subsequent | 
|  | // items. Otherwise, we need to start writing. | 
|  | if (state_queue_.size() == 1) { | 
|  | StartWrite(&state_queue_.front(), grpc::WriteOptions()); | 
|  | } | 
|  | } | 
|  |  | 
|  | mutable absl::Mutex mutex_; | 
|  | std::unique_ptr<StateChangeWriteReactor> self_; | 
|  | bool stopped_ ABSL_GUARDED_BY(mutex_) = false; | 
|  | std::queue<StateProto> state_queue_ ABSL_GUARDED_BY(mutex_); | 
|  | DisruptionManager::CallbackHandle disruption_callback_handle_; | 
|  | // This should be destroyed first so that the listener callbacks will not be | 
|  | // called after any other members in this class are destroyed. | 
|  | UpdateListener listener_; | 
|  | }; | 
|  |  | 
|  | }  // namespace safepower_agent | 
|  |  | 
|  | #endif  // PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_ |