| #ifndef PLATFORMS_VBMC_PLATFORM_EIGER_STORAGE_MANAGER_H_ |
| #define PLATFORMS_VBMC_PLATFORM_EIGER_STORAGE_MANAGER_H_ |
| |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| #include "absl/time/clock.h" |
| #include "absl/time/time.h" |
| #include "proxy.h" |
| #include "redfish_plugin.h" |
| #include "sse_plugin/event.h" |
| |
| namespace milotic { |
| |
| // EventStorageManager abstracts access (read/write) to an event storage |
| // for events and logs. |
| class EventStorageManager { |
| public: |
| virtual ~EventStorageManager() = default; |
| virtual absl::Status SaveEvent(const Event& event) = 0; |
| virtual absl::Status SaveCperEvent(const Event& event) = 0; |
| virtual absl::Status SaveLogs(const Event& event, |
| absl::string_view contents) = 0; |
| |
| // Retrieve stored processed event to replay and bootstrap machine state |
| virtual absl::StatusOr<std::vector<Event>> RetrieveProcessedEvents() = 0; |
| |
| // Archive stored processed events so that they are not sent during next call |
| // to RetrieveProcessedEvents |
| virtual absl::Status ArchiveProcessedEvents() = 0; |
| virtual absl::Status SendHeartbeat() = 0; |
| void SetProxy(Proxy* proxy) { proxy_ = proxy; } |
| void SetEventHandler(RedfishPlugin::EventHandler* event_handler) |
| ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| event_handler_ = event_handler; |
| disconnect_ = false; |
| } |
| void SetProcessedEvents(std::vector<Event> events) |
| ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| events_ = std::move(events); |
| processed_events_ready_ = true; |
| } |
| void AwaitProcessedEvents() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| int current_reset_count = reset_count_; |
| auto check_processed_events = |
| [this, current_reset_count]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| return processed_events_ready_ || reset_count_ != current_reset_count; |
| }; |
| mutex_.Await(absl::Condition(&check_processed_events)); |
| } |
| |
| void AwaitEventHandler() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| int current_reset_count = reset_count_; |
| auto check_handler = [this, current_reset_count]() |
| ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| return event_handler_ != nullptr || |
| reset_count_ != current_reset_count; |
| }; |
| mutex_.Await(absl::Condition(&check_handler)); |
| } |
| bool CheckEventHandler() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { |
| return event_handler_ != nullptr; |
| } |
| void Disconnect() ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| disconnect_ = true; |
| } |
| void ResetEventHandler() ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| event_handler_ = nullptr; |
| } |
| virtual void AwaitDisconnect() ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| mutex_.Await(absl::Condition(&disconnect_)); |
| } |
| void SetConnectionId(absl::string_view connection_id) |
| ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| connection_id_ = connection_id; |
| } |
| bool CheckConnectionId(absl::string_view connection_id) |
| ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| return connection_id_ == connection_id; |
| } |
| bool ConnectionIdIsEmpty() ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| return connection_id_.empty(); |
| } |
| std::string ConnectionId() ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| return connection_id_; |
| } |
| |
| void Reset() ABSL_LOCKS_EXCLUDED(mutex_) { |
| absl::MutexLock lock(&mutex_); |
| reset_count_++; |
| events_.clear(); |
| disconnect_ = false; |
| processed_events_ready_ = false; |
| connection_id_.clear(); |
| event_handler_ = nullptr; |
| } |
| |
| protected: |
| void AwaitProxyInit() { |
| while ((proxy_ == nullptr) || !proxy_->IsInitialized()) { |
| absl::SleepFor(absl::Seconds(0.5)); |
| } |
| } |
| |
| absl::Mutex mutex_; |
| Proxy* proxy_ = nullptr; |
| std::vector<Event> events_ ABSL_GUARDED_BY(mutex_); |
| bool disconnect_ ABSL_GUARDED_BY(mutex_) = false; |
| int reset_count_ ABSL_GUARDED_BY(mutex_) = 0; |
| bool processed_events_ready_ ABSL_GUARDED_BY(mutex_) = false; |
| std::string connection_id_ ABSL_GUARDED_BY(mutex_); |
| RedfishPlugin::EventHandler* event_handler_ ABSL_GUARDED_BY(mutex_) = nullptr; |
| }; |
| |
| // Used for tests |
| class FakeStorageManager : public EventStorageManager { |
| public: |
| explicit FakeStorageManager(bool disconnect = false) { |
| disconnect_ = disconnect; |
| } |
| absl::Status SaveEvent(const Event& event) override { |
| return absl::OkStatus(); |
| } |
| absl::Status SaveCperEvent(const Event& event) override { |
| return absl::OkStatus(); |
| } |
| absl::Status SaveLogs(const Event& event, |
| absl::string_view contents) override { |
| return absl::OkStatus(); |
| } |
| absl::Status SendHeartbeat() override { return absl::OkStatus(); } |
| absl::StatusOr<std::vector<Event>> RetrieveProcessedEvents() override { |
| absl::MutexLock lock(&mutex_); |
| AwaitProxyInit(); |
| return events_; |
| } |
| absl::Status ArchiveProcessedEvents() override { |
| absl::MutexLock lock(&mutex_); |
| events_.clear(); |
| return absl::OkStatus(); |
| } |
| |
| void SetProcessedEvents(std::vector<Event> events) { |
| absl::MutexLock lock(&mutex_); |
| events_ = std::move(events); |
| } |
| |
| void AwaitDisconnect() override {} |
| }; |
| |
| // Used for tests |
| class LogStorageManager : public EventStorageManager { |
| public: |
| explicit LogStorageManager(bool disconnect = false) { |
| disconnect_ = disconnect; |
| } |
| absl::Status SaveEvent(const Event& event) override { |
| sse_events_.push_back(event); |
| LOG(INFO) << "SSE event: " << event.event_json(); |
| return absl::OkStatus(); |
| } |
| absl::Status SaveCperEvent(const Event& event) override { |
| sse_events_.push_back(event); |
| LOG(INFO) << "SSE CPER event: " << event.event_json(); |
| return absl::OkStatus(); |
| } |
| absl::Status SaveLogs(const Event& event, |
| absl::string_view contents) override { |
| LOG(INFO) << "SSE logs for event" << event.event_id() << " : " << contents; |
| return absl::OkStatus(); |
| } |
| absl::Status SendHeartbeat() override { |
| LOG(INFO) << "SSE heartbeat"; |
| return absl::OkStatus(); |
| } |
| absl::StatusOr<std::vector<Event>> RetrieveProcessedEvents() override { |
| AwaitProxyInit(); |
| return replay_events_; |
| } |
| absl::Status ArchiveProcessedEvents() override { |
| replay_events_.clear(); |
| return absl::OkStatus(); |
| } |
| |
| void SetProcessedEvents(std::vector<Event> events) { |
| replay_events_ = std::move(events); |
| } |
| |
| private: |
| std::vector<Event> replay_events_; |
| std::vector<Event> sse_events_; |
| }; |
| |
| } // namespace milotic |
| |
| #endif // PLATFORMS_VBMC_PLATFORM_EIGER_STORAGE_MANAGER_H_ |