| #ifndef PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_UPDATER_H_ |
| #define PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_UPDATER_H_ |
| |
| #include <algorithm> |
| #include <array> |
| #include <cstddef> |
| #include <list> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "callback_manager.h" |
| #include "state_merge.h" |
| #include "absl/base/nullability.h" |
| #include "absl/base/thread_annotations.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/log/log.h" |
| #include "absl/synchronization/mutex.h" |
| |
| namespace safepower_agent { |
| template <typename StateProto> |
| class StateUpdater { |
| public: |
| class Listener { |
| struct StateUpdaterRef { |
| std::weak_ptr<StateUpdater> ptr; |
| StateUpdater* raw_ptr; // For compares in destructor. |
| typename std::list<Listener* >::iterator itr; |
| }; |
| |
| public: |
| virtual ~Listener() { |
| for (const StateUpdaterRef& ref : state_updater_) { |
| std::shared_ptr<StateUpdater> state_updater = ref.ptr.lock(); |
| if (state_updater != nullptr) { |
| state_updater->RemoveListener(ref.itr); |
| } |
| } |
| } |
| |
| template <typename... Args> |
| void Listen(const Args&... args) { |
| static_assert(sizeof...(args) > 0); |
| if (!state_updater_.empty()) { |
| LOG(DFATAL) << "Listener::Listen called multiple times"; |
| return; |
| } |
| state_updater_.reserve(sizeof...(args)); |
| std::array done = {Insert(args)...}; |
| if (std::all_of(done.begin(), done.end(), [](bool b) { return b; })) { |
| Done(); |
| } |
| } |
| |
| virtual void UpdateState(const StateProto& previous_state, |
| const StateProto& update) = 0; |
| |
| void MaybeDone(const StateUpdater& updater) { |
| for (StateUpdaterRef& ref : state_updater_) { |
| if (ref.raw_ptr == &updater) { |
| ref.raw_ptr = nullptr; |
| ++done_count_; |
| if (done_count_ >= state_updater_.size()) { |
| Done(); |
| } |
| return; |
| } |
| } |
| LOG(DFATAL) << "Listener not listening to this updater"; |
| } |
| |
| protected: |
| virtual void Done() = 0; |
| |
| private: |
| bool Insert(const std::shared_ptr<StateUpdater>& updater) { |
| for (const StateUpdaterRef& ref : state_updater_) { |
| if (ref.ptr.lock() == updater) { |
| LOG(DFATAL) << "Listener already listening to this updater"; |
| return true; |
| } |
| } |
| auto [itr, final] = updater->AddListener(*this); |
| state_updater_.push_back({updater, updater.get(), itr}); |
| return final; |
| } |
| |
| std::vector<StateUpdaterRef> state_updater_; |
| size_t done_count_ = 0; |
| }; |
| friend class Listener; |
| |
| explicit StateUpdater(StateProto initial_state = {}, bool final_state = false) |
| : state_(std::move(initial_state)), final_(final_state) {} |
| |
| ~StateUpdater() { |
| if (!listeners_mutex_.try_lock()) { |
| LOG(DFATAL) << "StateUpdater destroyed while mutex was held"; |
| return; |
| } |
| for (Listener* listener : listeners_) { |
| listener->MaybeDone(*this); |
| } |
| listeners_.clear(); |
| listeners_mutex_.unlock(); |
| } |
| |
| // The returned reference may change if another thread calls UpdateState; this |
| // function should generally only be used by the thread that updates the |
| // state. |
| const StateProto& state() const { |
| absl::MutexLock lock(state_mutex_); |
| return state_; |
| } |
| |
| void UpdateState(const StateProto& update, bool final_state = false) |
| ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) { |
| absl::MutexLock lock_state(state_mutex_); |
| final_ = final_state; |
| absl::MutexLock lock_listeners(listeners_mutex_); |
| for (auto listener : listeners_) { |
| listener->UpdateState(state_, update); |
| if (final_state) { |
| listener->MaybeDone(*this); |
| } |
| } |
| ApplyStateUpdate(state_, update); |
| } |
| |
| // Runs a callback when the updater has no more listeners. The callback will |
| // be called immediately if the updater is already idle. The move-only |
| // returned handle can be destroyed to cancel the callback. |
| // Idle callbacks are run in LIFO order, while active callbacks are run in |
| // FIFO order. This reduces the likelihood of deadlocks: For example, if a |
| // resource is acquired when active and released when idle, another pair of |
| // callbacks that are added later will always see that resource acquired. |
| CallbackManager::Handle OnIdle(absl::AnyInvocable<void() &&> callback) |
| ABSL_LOCKS_EXCLUDED(listeners_mutex_) { |
| if (IsIdle()) { |
| std::move(callback)(); |
| return {}; |
| } |
| return activity_callbacks_.RunFirst(std::move(callback)); |
| } |
| |
| // Runs a callback when the updater has at least one listener. The callback |
| // will be called immediately if the updater already has listeners. See OnIdle |
| // for more details. |
| CallbackManager::Handle OnActive(absl::AnyInvocable<void() &&> callback) |
| ABSL_LOCKS_EXCLUDED(listeners_mutex_) { |
| if (!IsIdle()) { |
| std::move(callback)(); |
| return {}; |
| } |
| return activity_callbacks_.RunLast(std::move(callback)); |
| } |
| |
| private: |
| bool IsIdle() const ABSL_LOCKS_EXCLUDED(listeners_mutex_) { |
| absl::MutexLock lock(listeners_mutex_); |
| return listeners_.empty(); |
| } |
| |
| std::pair<typename std::list<Listener* >::iterator, bool> |
| AddListener(Listener& listener) |
| ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) { |
| bool final_state; |
| { |
| absl::MutexLock lock(state_mutex_); |
| final_state = final_; |
| // It is always safe to check for true on this since it is never cleared. |
| listener.UpdateState({}, state_); |
| } |
| |
| bool was_empty; |
| std::pair<typename std::list<Listener* >::iterator, bool> |
| result; |
| { |
| absl::MutexLock lock(listeners_mutex_); |
| was_empty = listeners_.empty(); |
| listeners_.push_front(&listener); |
| result = std::make_pair(listeners_.begin(), final_state); |
| } |
| if (was_empty) { |
| activity_callbacks_.RunCallbacks(); |
| } |
| return result; |
| } |
| |
| void RemoveListener(std::list<Listener* >::iterator handle) |
| ABSL_LOCKS_EXCLUDED(listeners_mutex_) { |
| bool becomes_empty = false; |
| { |
| absl::MutexLock lock(listeners_mutex_); |
| if (handle == listeners_.end()) return; |
| listeners_.erase(handle); |
| becomes_empty = listeners_.empty(); |
| } |
| if (becomes_empty) { |
| activity_callbacks_.RunCallbacks(); |
| } |
| } |
| |
| mutable absl::Mutex state_mutex_ ABSL_ACQUIRED_BEFORE(listeners_mutex_); |
| mutable absl::Mutex listeners_mutex_; |
| // This is a std::list so that we have stable iterators |
| std::list<Listener* > listeners_ |
| ABSL_GUARDED_BY(listeners_mutex_); |
| StateProto state_ ABSL_GUARDED_BY(state_mutex_); |
| bool final_ ABSL_GUARDED_BY(state_mutex_); |
| // Callbacks that are run when the updater becomes idle (no listeners) or |
| // active (at least one listener). |
| CallbackManager activity_callbacks_; |
| }; |
| |
| } // namespace safepower_agent |
| |
| #endif // PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_UPDATER_H_ |