blob: 636cb733e1d6aa756a50d046e72f8b5edda6cbb7 [file] [log] [blame]
#ifndef PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_
#define PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_
#include <memory>
#include <queue>
#include "state_updater.h"
#include "absl/base/nullability.h"
#include "absl/base/thread_annotations.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/synchronization/mutex.h"
#include "grpcpp/impl/call_op_set.h"
#include "grpcpp/support/server_callback.h"
#include "grpcpp/support/status.h"
namespace safepower_agent {
template <typename StateProto>
class StateChangeWriteReactor : public grpc::ServerWriteReactor<StateProto> {
class UpdateListener : public StateUpdater<StateProto>::Listener {
public:
explicit UpdateListener(StateChangeWriteReactor<StateProto>& reactor)
: reactor_(reactor) {}
void UpdateState(const StateProto& /*current_state*/,
const StateProto& update) override {
reactor_.PushUpdate(update);
}
void Done() override { reactor_.Stop(); }
private:
StateChangeWriteReactor<StateProto>& reactor_;
};
friend class UpdateListener;
using grpc::ServerWriteReactor<StateProto>::StartWrite;
using grpc::ServerWriteReactor<StateProto>::StartWriteLast;
public:
StateChangeWriteReactor() : listener_(*this) {};
~StateChangeWriteReactor() override { Stop(); }
using grpc::ServerWriteReactor<StateProto>::Finish;
void OnDone() override { self_.reset(); }
void OnCancel() override {
Stop();
Finish(grpc::Status::CANCELLED);
}
void OnWriteDone(bool ok) ABSL_LOCKS_EXCLUDED(mutex_) override {
if (!ok) {
LOG(ERROR) << "Write failed";
Stop();
Finish(grpc::Status(grpc::StatusCode::UNKNOWN, "Unexpected failure"));
return;
}
absl::MutexLock lock(&mutex_);
CHECK(!state_queue_.empty());
state_queue_.pop();
if (state_queue_.empty()) {
if (stopped_) {
Finish(grpc::Status::OK);
}
return;
}
StartWrite(&state_queue_.front(), grpc::WriteOptions());
}
void Connect(const
std::shared_ptr<StateUpdater<StateProto>>& updater)
ABSL_LOCKS_EXCLUDED(mutex_) {
listener_.Listen(updater);
}
static StateChangeWriteReactor* Detach(
std::unique_ptr<StateChangeWriteReactor<StateProto>>
reactor) {
StateChangeWriteReactor* ptr = reactor.get();
if (ptr == nullptr) return nullptr;
ptr->self_ = std::move(reactor);
return ptr;
}
private:
void Stop() ABSL_LOCKS_EXCLUDED(mutex_) { stopped_ = true; }
void PushUpdate(const StateProto& update) ABSL_LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
state_queue_.push(update);
// If this is not the first item, OnWriteDone will start writing subsequent
// items. Otherwise, we need to start writing.
if (state_queue_.size() == 1) {
StartWrite(&state_queue_.front(), grpc::WriteOptions());
}
}
absl::Mutex mutex_;
std::unique_ptr<StateChangeWriteReactor> self_;
bool stopped_ = false;
std::queue<StateProto> state_queue_ ABSL_GUARDED_BY(mutex_);
// This should be destroyed first so that the listener callbacks will not be
// called after any other members in this class are destroyed.
UpdateListener listener_;
};
} // namespace safepower_agent
#endif // PRODUCTION_SUSHID_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_