blob: c8477e501dce663681c38a2e39343abefaab6c14 [file] [log] [blame]
#ifndef PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_UPDATER_H_
#define PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_UPDATER_H_
#include <list>
#include <memory>
#include "state_merge.h"
#include "absl/base/nullability.h"
#include "absl/base/thread_annotations.h"
#include "absl/log/log.h"
#include "absl/synchronization/mutex.h"
namespace safepower_agent {
template <typename StateProto>
class StateUpdater {
public:
class Listener {
public:
virtual ~Listener() {
std::shared_ptr<StateUpdater> state_updater = state_updater_.lock();
if (state_updater != nullptr) {
state_updater->RemoveListener(itr_);
}
}
void Listen(const absl::Nonnull<std::shared_ptr<StateUpdater>>& updater) {
if (!state_updater_.expired()) {
LOG(DFATAL) << "Listener already connected";
return;
}
state_updater_ = updater;
itr_ = updater->AddListener(*this);
}
virtual void UpdateState(const StateProto& previous_state,
const StateProto& update) = 0;
virtual void Done() = 0;
private:
std::weak_ptr<StateUpdater> state_updater_;
std::list<absl::Nonnull<Listener*>>::iterator itr_;
};
friend class Listener;
explicit StateUpdater(StateProto initial_state = {}, bool final = false)
: state_(std::move(initial_state)), final_(final) {}
~StateUpdater() {
if (!listeners_mutex_.TryLock()) {
LOG(DFATAL) << "StateUpdater destroyed while mutex was held";
return;
}
for (Listener* listener : listeners_) {
listener->Done();
}
listeners_.clear();
listeners_mutex_.Unlock();
}
// The returned reference may change if another thread calls UpdateState; this
// function should generally only be used by the thread that updates the
// state.
const StateProto& state() const {
absl::MutexLock lock(&state_mutex_);
return state_;
}
void UpdateState(const StateProto& update, bool final = false)
ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) {
absl::MutexLock lock_state(&state_mutex_);
final_ = final;
absl::MutexLock lock_listeners(&listeners_mutex_);
for (auto listener : listeners_) {
listener->UpdateState(state_, update);
if (final) {
listener->Done();
}
}
ApplyStateUpdate(state_, update);
}
private:
std::list<absl::Nonnull<Listener*>>::iterator AddListener(Listener& listener)
ABSL_LOCKS_EXCLUDED(state_mutex_, listeners_mutex_) {
bool final;
{
absl::MutexLock lock(&state_mutex_);
final = final_; // It is always safe to check for true on this since it
// is never cleared.
listener.UpdateState({}, state_);
}
absl::MutexLock lock(&listeners_mutex_);
if (final) {
listener.Done();
return listeners_.end();
}
listeners_.push_front(&listener);
return listeners_.begin();
}
void RemoveListener(std::list<absl::Nonnull<Listener*>>::iterator handle)
ABSL_LOCKS_EXCLUDED(listeners_mutex_) {
absl::MutexLock lock(&listeners_mutex_);
if (handle == listeners_.end()) return;
listeners_.erase(handle);
}
mutable absl::Mutex state_mutex_ ABSL_ACQUIRED_BEFORE(listeners_mutex_);
absl::Mutex listeners_mutex_;
// This is a std::list so that we have stable iterators
std::list<absl::Nonnull<Listener*>> listeners_
ABSL_GUARDED_BY(listeners_mutex_);
StateProto state_ ABSL_GUARDED_BY(state_mutex_);
bool final_ ABSL_GUARDED_BY(state_mutex_);
};
} // namespace safepower_agent
#endif // PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_UPDATER_H_