blob: 0257d70821e7ca6fc41be3385f648f4d608173a4 [file] [log] [blame]
#ifndef PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_
#define PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_
#include <memory>
#include <queue>
#include "daemon_context.h"
#include "disruption_manager.h"
#include "state_updater.h"
#include "absl/base/nullability.h"
#include "absl/base/thread_annotations.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/synchronization/mutex.h"
#include "grpcpp/impl/call_op_set.h"
#include "grpcpp/support/server_callback.h"
#include "grpcpp/support/status.h"
namespace safepower_agent {
template <typename StateProto>
class StateChangeWriteReactor : public grpc::ServerWriteReactor<StateProto> {
class UpdateListener : public StateUpdater<StateProto>::Listener {
public:
explicit UpdateListener(StateChangeWriteReactor<StateProto>& reactor)
: reactor_(reactor) {}
void UpdateState(const StateProto& /*current_state*/,
const StateProto& update) override {
reactor_.PushUpdate(update);
}
void Done() override { reactor_.Stop(); }
private:
StateChangeWriteReactor<StateProto>& reactor_;
};
friend class UpdateListener;
using grpc::ServerWriteReactor<StateProto>::StartWrite;
using grpc::ServerWriteReactor<StateProto>::StartWriteLast;
public:
StateChangeWriteReactor() : listener_(*this) {};
using grpc::ServerWriteReactor<StateProto>::Finish;
void OnDone() override { self_.reset(); }
void OnCancel() override {
LOG(INFO) << "Stream cancelled";
Stop(/*wait=*/false);
}
void OnWriteDone(bool ok) ABSL_LOCKS_EXCLUDED(mutex_) override {
if (!ok) {
LOG(ERROR) << "Write closed";
Stop();
Finish(grpc::Status(grpc::StatusCode::CANCELLED, "Write closed"));
return;
}
absl::MutexLock lock(&mutex_);
CHECK(!state_queue_.empty());
state_queue_.pop();
if (state_queue_.empty()) {
if (stopped_) {
LOG(INFO) << "Stream stopped";
Finish(grpc::Status::OK);
}
return;
}
StartWrite(&state_queue_.front(), grpc::WriteOptions());
}
void Connect(const
std::shared_ptr<StateUpdater<StateProto>>& updater)
ABSL_LOCKS_EXCLUDED(mutex_) {
listener_.Listen(updater);
disruption_callback_handle_ =
DaemonContext::Get().disruption_manager().OnDisruptionStart([this] {
LOG(INFO) << "Stopping stream because disruption is expected";
Stop(/*wait=*/true);
});
}
static StateChangeWriteReactor* Detach(
std::unique_ptr<StateChangeWriteReactor<StateProto>>
reactor) {
StateChangeWriteReactor* ptr = reactor.get();
if (ptr == nullptr) return nullptr;
ptr->self_ = std::move(reactor);
return ptr;
}
bool stopped() const ABSL_LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
return stopped_;
}
private:
void Stop(bool wait = false) ABSL_LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
if (!stopped_) {
stopped_ = true;
if (state_queue_.empty()) {
LOG(INFO) << "Stream stopped";
Finish(grpc::Status::OK);
}
}
if (!wait) return;
mutex_.Await(absl::Condition(
+[](std::queue<StateProto>* state_queue) {
return state_queue->empty();
},
&state_queue_));
}
void PushUpdate(const StateProto& update) ABSL_LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
if (stopped_) {
LOG(WARNING) << "Dropping update because the stream is stopped";
return;
}
state_queue_.push(update);
// If this is not the first item, OnWriteDone will start writing subsequent
// items. Otherwise, we need to start writing.
if (state_queue_.size() == 1) {
StartWrite(&state_queue_.front(), grpc::WriteOptions());
}
}
mutable absl::Mutex mutex_;
std::unique_ptr<StateChangeWriteReactor> self_;
bool stopped_ ABSL_GUARDED_BY(mutex_) = false;
std::queue<StateProto> state_queue_ ABSL_GUARDED_BY(mutex_);
DisruptionManager::CallbackHandle disruption_callback_handle_;
// This should be destroyed first so that the listener callbacks will not be
// called after any other members in this class are destroyed.
UpdateListener listener_;
};
} // namespace safepower_agent
#endif // PRODUCTION_BORG_MGMT_NODE_PROXY_SAFEPOWER_SAFEPOWER_AGENT_STATE_CHANGE_REACTOR_H_