| #ifndef PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_UPDATER_H_ |
| #define PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_UPDATER_H_ |
| |
| #include <list> |
| #include <memory> |
| |
| #include "state_merge.h" |
| #include "absl/base/nullability.h" |
| #include "absl/base/thread_annotations.h" |
| #include "absl/log/log.h" |
| #include "absl/synchronization/mutex.h" |
| |
| namespace safepower_agent { |
| template <typename StateProto> |
| class StateUpdater { |
| public: |
| class Listener { |
| public: |
| virtual ~Listener() { |
| std::shared_ptr<StateUpdater> state_updater = state_updater_.lock(); |
| if (state_updater != nullptr) { |
| state_updater->RemoveListener(itr_); |
| } |
| } |
| |
| void Listen(const absl::Nonnull<std::shared_ptr<StateUpdater>>& updater) { |
| if (!state_updater_.expired()) { |
| LOG(DFATAL) << "Listener already connected"; |
| return; |
| } |
| state_updater_ = updater; |
| itr_ = updater->AddListener(*this); |
| } |
| |
| virtual void UpdateState(const StateProto& previous_state, |
| const StateProto& update) = 0; |
| virtual void Done() = 0; |
| |
| private: |
| std::weak_ptr<StateUpdater> state_updater_; |
| std::list<absl::Nonnull<Listener*>>::iterator itr_; |
| }; |
| friend class Listener; |
| |
| explicit StateUpdater(StateProto initial_state = {}, bool final = false) |
| : state_(std::move(initial_state)), final_(final) {} |
| |
| ~StateUpdater() { |
| if (!listeners_mutex_.TryLock()) { |
| LOG(DFATAL) << "StateUpdater destroyed while mutex was held"; |
| return; |
| } |
| for (Listener* listener : listeners_) { |
| listener->Done(); |
| } |
| listeners_.clear(); |
| listeners_mutex_.Unlock(); |
| } |
| |
| // The returned reference may change if another thread calls UpdateState; this |
| // function should generally only be used by the thread that updates the |
| // state. |
| const StateProto& state() const { |
| absl::MutexLock lock(&state_mutex_); |
| return state_; |
| } |
| |
| void UpdateState(const StateProto& update, bool final = false) |
| ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) { |
| absl::MutexLock lock_state(&state_mutex_); |
| final_ = final; |
| absl::MutexLock lock_listeners(&listeners_mutex_); |
| for (auto listener : listeners_) { |
| listener->UpdateState(state_, update); |
| if (final) { |
| listener->Done(); |
| } |
| } |
| ApplyStateUpdate(state_, update); |
| } |
| |
| private: |
| std::list<absl::Nonnull<Listener*>>::iterator AddListener(Listener& listener) |
| ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) { |
| bool final; |
| { |
| absl::MutexLock lock(&state_mutex_); |
| final = final_; // It is always safe to check for true on this since it |
| // is never cleared. |
| listener.UpdateState({}, state_); |
| } |
| |
| absl::MutexLock lock(&listeners_mutex_); |
| if (final) { |
| listener.Done(); |
| return listeners_.end(); |
| } |
| listeners_.push_front(&listener); |
| return listeners_.begin(); |
| } |
| |
| void RemoveListener(std::list<absl::Nonnull<Listener*>>::iterator handle) |
| ABSL_LOCKS_EXCLUDED(listeners_mutex_) { |
| absl::MutexLock lock(&listeners_mutex_); |
| if (handle == listeners_.end()) return; |
| listeners_.erase(handle); |
| } |
| |
| mutable absl::Mutex state_mutex_ ABSL_ACQUIRED_BEFORE(listeners_mutex_); |
| absl::Mutex listeners_mutex_; |
| // This is a std::list so that we have stable iterators |
| std::list<absl::Nonnull<Listener*>> listeners_ |
| ABSL_GUARDED_BY(listeners_mutex_); |
| StateProto state_ ABSL_GUARDED_BY(state_mutex_); |
| bool final_ ABSL_GUARDED_BY(state_mutex_); |
| }; |
| |
| } // namespace safepower_agent |
| |
| #endif // PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_UPDATER_H_ |