| #ifndef PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_ |
| #define PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_ |
| |
| #include <memory> |
| #include <queue> |
| |
| #include "state_updater.h" |
| #include "absl/base/nullability.h" |
| #include "absl/base/thread_annotations.h" |
| #include "absl/log/check.h" |
| #include "absl/log/log.h" |
| #include "absl/synchronization/mutex.h" |
| #include "grpcpp/impl/call_op_set.h" |
| #include "grpcpp/support/server_callback.h" |
| #include "grpcpp/support/status.h" |
| |
| namespace safepower_agent { |
| template <typename StateProto> |
| class StateChangeWriteReactor : public grpc::ServerWriteReactor<StateProto> { |
| class UpdateListener : public StateUpdater<StateProto>::Listener { |
| public: |
| explicit UpdateListener(StateChangeWriteReactor<StateProto>& reactor) |
| : reactor_(reactor) {} |
| void UpdateState(const StateProto& /*current_state*/, |
| const StateProto& update) override { |
| reactor_.PushUpdate(update); |
| } |
| void Done() override { reactor_.Stop(); } |
| |
| private: |
| StateChangeWriteReactor<StateProto>& reactor_; |
| }; |
| friend class UpdateListener; |
| |
| using grpc::ServerWriteReactor<StateProto>::StartWrite; |
| using grpc::ServerWriteReactor<StateProto>::StartWriteLast; |
| |
| public: |
| StateChangeWriteReactor() : listener_(*this) {}; |
| ~StateChangeWriteReactor() override { Stop(); } |
| |
| using grpc::ServerWriteReactor<StateProto>::Finish; |
| |
| void OnDone() override { self_.reset(); } |
| void OnCancel() override { |
| Stop(); |
| Finish(grpc::Status::CANCELLED); |
| } |
| |
| void OnWriteDone(bool ok) ABSL_LOCKS_EXCLUDED(mutex_) override { |
| if (!ok) { |
| LOG(ERROR) << "Write failed"; |
| Stop(); |
| Finish(grpc::Status(grpc::StatusCode::UNKNOWN, "Unexpected failure")); |
| return; |
| } |
| absl::MutexLock lock(&mutex_); |
| CHECK(!state_queue_.empty()); |
| state_queue_.pop(); |
| if (state_queue_.empty()) { |
| if (stopped_) { |
| Finish(grpc::Status::OK); |
| } |
| return; |
| } |
| StartWrite(&state_queue_.front(), grpc::WriteOptions()); |
| } |
| |
| void Connect( |
| const absl::Nonnull<std::shared_ptr<StateUpdater<StateProto>>>& updater) |
| ABSL_LOCKS_EXCLUDED(mutex_) { |
| listener_.Listen(updater); |
| } |
| |
| static absl::Nullable<StateChangeWriteReactor*> Detach( |
| absl::Nullable<std::unique_ptr<StateChangeWriteReactor<StateProto>>> |
| reactor) { |
| absl::Nullable<StateChangeWriteReactor*> ptr = reactor.get(); |
| if (ptr == nullptr) return nullptr; |
| ptr->self_ = std::move(reactor); |
| return ptr; |
| } |
| |
| private: |
| void Stop() ABSL_LOCKS_EXCLUDED(mutex_) { stopped_ = true; } |
| |
| void PushUpdate(const StateProto& update) ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| state_queue_.push(update); |
| // If this is not the first item, OnWriteDone will start writing subsequent |
| // items. Otherwise, we need to start writing. |
| if (state_queue_.size() == 1) { |
| StartWrite(&state_queue_.front(), grpc::WriteOptions()); |
| } |
| } |
| |
| absl::Mutex mutex_; |
| std::unique_ptr<StateChangeWriteReactor> self_; |
| bool stopped_ = false; |
| std::queue<StateProto> state_queue_ ABSL_GUARDED_BY(mutex_); |
| // This should be destroyed first so that the listener callbacks will not be |
| // called after any other members in this class are destroyed. |
| UpdateListener listener_; |
| }; |
| |
| } // namespace safepower_agent |
| |
| #endif // PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_ |