blob: 4561f85e9223b358ef185b6c31f21472247215a5 [file] [log] [blame]
#ifndef PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_UPDATER_H_
#define PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_UPDATER_H_
#include <algorithm>
#include <array>
#include <cstddef>
#include <list>
#include <memory>
#include <utility>
#include <vector>
#include "callback_manager.h"
#include "state_merge.h"
#include "absl/base/nullability.h"
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/synchronization/mutex.h"
namespace safepower_agent {
template <typename StateProto>
class StateUpdater {
public:
class Listener {
struct StateUpdaterRef {
std::weak_ptr<StateUpdater> ptr;
StateUpdater* raw_ptr; // For compares in destructor.
typename std::list<Listener* >::iterator itr;
};
public:
virtual ~Listener() {
for (const StateUpdaterRef& ref : state_updater_) {
std::shared_ptr<StateUpdater> state_updater = ref.ptr.lock();
if (state_updater != nullptr) {
state_updater->RemoveListener(ref.itr);
}
}
}
template <typename... Args>
void Listen(const Args&... args) {
static_assert(sizeof...(args) > 0);
if (!state_updater_.empty()) {
LOG(DFATAL) << "Listener::Listen called multiple times";
return;
}
state_updater_.reserve(sizeof...(args));
std::array done = {Insert(args)...};
if (std::all_of(done.begin(), done.end(), [](bool b) { return b; })) {
Done();
}
}
virtual void UpdateState(const StateProto& previous_state,
const StateProto& update) = 0;
void MaybeDone(const StateUpdater& updater) {
for (StateUpdaterRef& ref : state_updater_) {
if (ref.raw_ptr == &updater) {
ref.raw_ptr = nullptr;
++done_count_;
if (done_count_ >= state_updater_.size()) {
Done();
}
return;
}
}
LOG(DFATAL) << "Listener not listening to this updater";
}
protected:
virtual void Done() = 0;
private:
bool Insert(const std::shared_ptr<StateUpdater>& updater) {
for (const StateUpdaterRef& ref : state_updater_) {
if (ref.ptr.lock() == updater) {
LOG(DFATAL) << "Listener already listening to this updater";
return true;
}
}
auto [itr, final] = updater->AddListener(*this);
state_updater_.push_back({updater, updater.get(), itr});
return final;
}
std::vector<StateUpdaterRef> state_updater_;
size_t done_count_ = 0;
};
friend class Listener;
explicit StateUpdater(StateProto initial_state = {}, bool final_state = false)
: state_(std::move(initial_state)), final_(final_state) {}
~StateUpdater() {
if (!listeners_mutex_.try_lock()) {
LOG(DFATAL) << "StateUpdater destroyed while mutex was held";
return;
}
for (Listener* listener : listeners_) {
listener->MaybeDone(*this);
}
listeners_.clear();
listeners_mutex_.unlock();
}
// The returned reference may change if another thread calls UpdateState; this
// function should generally only be used by the thread that updates the
// state.
const StateProto& state() const {
absl::MutexLock lock(state_mutex_);
return state_;
}
void UpdateState(const StateProto& update, bool final_state = false)
ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) {
absl::MutexLock lock_state(state_mutex_);
final_ = final_state;
absl::MutexLock lock_listeners(listeners_mutex_);
for (auto listener : listeners_) {
listener->UpdateState(state_, update);
if (final_state) {
listener->MaybeDone(*this);
}
}
ApplyStateUpdate(state_, update);
}
// Runs a callback when the updater has no more listeners. The callback will
// be called immediately if the updater is already idle. The move-only
// returned handle can be destroyed to cancel the callback.
// Idle callbacks are run in LIFO order, while active callbacks are run in
// FIFO order. This reduces the likelihood of deadlocks: For example, if a
// resource is acquired when active and released when idle, another pair of
// callbacks that are added later will always see that resource acquired.
CallbackManager::Handle OnIdle(absl::AnyInvocable<void() &&> callback)
ABSL_LOCKS_EXCLUDED(listeners_mutex_) {
if (IsIdle()) {
std::move(callback)();
return {};
}
return activity_callbacks_.RunFirst(std::move(callback));
}
// Runs a callback when the updater has at least one listener. The callback
// will be called immediately if the updater already has listeners. See OnIdle
// for more details.
CallbackManager::Handle OnActive(absl::AnyInvocable<void() &&> callback)
ABSL_LOCKS_EXCLUDED(listeners_mutex_) {
if (!IsIdle()) {
std::move(callback)();
return {};
}
return activity_callbacks_.RunLast(std::move(callback));
}
private:
bool IsIdle() const ABSL_LOCKS_EXCLUDED(listeners_mutex_) {
absl::MutexLock lock(listeners_mutex_);
return listeners_.empty();
}
std::pair<typename std::list<Listener* >::iterator, bool>
AddListener(Listener& listener)
ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) {
bool final_state;
{
absl::MutexLock lock(state_mutex_);
final_state = final_;
// It is always safe to check for true on this since it is never cleared.
listener.UpdateState({}, state_);
}
bool was_empty;
std::pair<typename std::list<Listener* >::iterator, bool>
result;
{
absl::MutexLock lock(listeners_mutex_);
was_empty = listeners_.empty();
listeners_.push_front(&listener);
result = std::make_pair(listeners_.begin(), final_state);
}
if (was_empty) {
activity_callbacks_.RunCallbacks();
}
return result;
}
void RemoveListener(std::list<Listener* >::iterator handle)
ABSL_LOCKS_EXCLUDED(listeners_mutex_) {
bool becomes_empty = false;
{
absl::MutexLock lock(listeners_mutex_);
if (handle == listeners_.end()) return;
listeners_.erase(handle);
becomes_empty = listeners_.empty();
}
if (becomes_empty) {
activity_callbacks_.RunCallbacks();
}
}
mutable absl::Mutex state_mutex_ ABSL_ACQUIRED_BEFORE(listeners_mutex_);
mutable absl::Mutex listeners_mutex_;
// This is a std::list so that we have stable iterators
std::list<Listener* > listeners_
ABSL_GUARDED_BY(listeners_mutex_);
StateProto state_ ABSL_GUARDED_BY(state_mutex_);
bool final_ ABSL_GUARDED_BY(state_mutex_);
// Callbacks that are run when the updater becomes idle (no listeners) or
// active (at least one listener).
CallbackManager activity_callbacks_;
};
} // namespace safepower_agent
#endif // PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_UPDATER_H_