Add mutex protection to BaseMachineState members.

This change introduces a mutex to guard the state variables within BaseMachineState and its derived classes, such as health, timestamps, and event IDs. This ensures thread safety when multiple threads access or modify these state variables. Additionally, calls to Shutdown() are added to destructors where necessary to ensure proper cleanup.

PiperOrigin-RevId: 889887042
Change-Id: I7c2278ed073444af0e563e0ae3ee661803b376ed
diff --git a/cper/manager.cc b/cper/manager.cc
index cad1201..5cf7475 100644
--- a/cper/manager.cc
+++ b/cper/manager.cc
@@ -281,6 +281,7 @@
 }
 
 CperEventsManager::~CperEventsManager() {
+  Shutdown();
   // Make sure there is no in-flight remapping
   shutdown_.Notify();
   remapping_ready_.WaitForNotification();
@@ -557,13 +558,6 @@
 bool CperMachineState::UpdateStatus(Event& event, bool is_replay) {
   VLOG(kVlogVerbosity) << "UpdateStatus event id: " << event.event_id();
   const Event& cache_event = event;
-  if (!is_replay && event.event_id() != last_event_id_ + 1) {
-    LOG(ERROR) << "LastEventId was " << last_event_id_
-               << " new event received with id " << event.event_id();
-  }
-  if (!is_replay && event.event_id() > last_event_id_) {
-    last_event_id_ = event.event_id();
-  }
 
   if (event.severity() == EventSeverity::kCritical &&
       event.cper_event_data().has_value()) {
@@ -573,8 +567,17 @@
     }
   }
 
+  absl::MutexLock lock(state_mutex_);
+  if (!is_replay && event.event_id() != last_event_id_ + 1) {
+    LOG(ERROR) << "LastEventId was " << last_event_id_
+               << " new event received with id " << event.event_id();
+  }
+  if (!is_replay && event.event_id() > last_event_id_) {
+    last_event_id_ = event.event_id();
+  }
+
   timestamp_last_event_ = absl::Now();
-  MachineHealth prev_health = health();
+  MachineHealth prev_health = health_;
   MachineHealth new_health = prev_health;
   switch (cache_event.severity()) {
     case EventSeverity::kOk:
@@ -596,7 +599,7 @@
 
   // Update values on health transition.
   if (new_health != prev_health) {
-    UpdateHealth(new_health);
+    UpdateHealthLocked(new_health);
     timestamp_last_health_transition_ = timestamp_last_event_;
     event_last_health_transition_ = cache_event;
   }
@@ -614,11 +617,13 @@
   json_event["EventId"] = "-1";
   json_event["EventType"] = "vbmc.Internal";
   json_event["Message"] = event_message;
+
+  absl::MutexLock lock(state_mutex_);
   if (severity == EventSeverity::kCritical) {
-    UpdateHealth(MachineHealth::kCritical);
+    UpdateHealthLocked(MachineHealth::kCritical);
   }
   if (severity == EventSeverity::kWarning) {
-    UpdateHealth(MachineHealth::kWarning);
+    UpdateHealthLocked(MachineHealth::kWarning);
   }
   timestamp_last_health_transition_ = absl::Now();
   event_last_health_transition_ =
diff --git a/sse_plugin/events_manager.cc b/sse_plugin/events_manager.cc
index 9544299..bac7e90 100644
--- a/sse_plugin/events_manager.cc
+++ b/sse_plugin/events_manager.cc
@@ -262,6 +262,10 @@
     stream_thread_->Join();
     stream_thread_ = nullptr;
   }
+  if (main_thread_ != nullptr) {
+    main_thread_->Join();
+    main_thread_ = nullptr;
+  }
 }
 
 void BaseEventsManager::Reset() {
@@ -315,13 +319,7 @@
   exit(1);
 }
 
-BaseEventsManager::~BaseEventsManager() {
-  Shutdown();
-  if (main_thread_ != nullptr) {
-    main_thread_->Join();
-    main_thread_ = nullptr;
-  }
-}
+BaseEventsManager::~BaseEventsManager() { Shutdown(); }
 
 void BaseEventsManager::FetchAndSaveLogs() {
   absl::StatusOr<std::string> log_contents = GetRedfishLogs();
@@ -471,9 +469,9 @@
   serving_state_timeout_sec_ = timeout_sec;
 }
 
-// Return the timestamp and events details when health changed to !OK last in
-// json format.
-nlohmann::json BaseMachineState::LastHealthChangeDetails() const {
+nlohmann::json BaseMachineState::LastHealthChangeDetails() const
+    ABSL_LOCKS_EXCLUDED(state_mutex_) {
+  absl::MutexLock lock(state_mutex_);
   nlohmann::json health_change;
   health_change["Timestamp"] = "";
   health_change["EventId"] = "";
diff --git a/sse_plugin/events_manager.h b/sse_plugin/events_manager.h
index 9e4239d..8667b4d 100644
--- a/sse_plugin/events_manager.h
+++ b/sse_plugin/events_manager.h
@@ -97,39 +97,63 @@
 // MachineState maintains the health status of the machine based on new events.
 class BaseMachineState {
  public:
-  explicit BaseMachineState() { UpdateHealth(MachineHealth::kOk); }
+  explicit BaseMachineState() {
+    absl::MutexLock lock(state_mutex_);
+    UpdateHealthLocked(MachineHealth::kOk);
+  }
 
   virtual bool UpdateStatus(milotic::Event& event, bool is_replay) = 0;
   // Used to update the health and status of the machine for internal events.
   virtual void UpdateInternalEvent(absl::string_view event_message,
                                    milotic::EventSeverity severity) = 0;
-  bool IsHealthy() { return health_ == MachineHealth::kOk; }
-  MachineHealth health() const { return health_; }
+  bool IsHealthy() const ABSL_LOCKS_EXCLUDED(state_mutex_) {
+    absl::MutexLock lock(state_mutex_);
+    return health_ == MachineHealth::kOk;
+  }
+  MachineHealth health() const ABSL_LOCKS_EXCLUDED(state_mutex_) {
+    absl::MutexLock lock(state_mutex_);
+    return health_;
+  }
   // TODO: b/293358466 - update timestamp_last_health_transition_ and
   // event_last_health_transition_.
   void MarkHealthy(
-      const std::source_location& location = std::source_location::current()) {
-    UpdateHealth(MachineHealth::kOk, location);
+      const std::source_location& location = std::source_location::current())
+      ABSL_LOCKS_EXCLUDED(state_mutex_) {
+    absl::MutexLock lock(state_mutex_);
+    UpdateHealthLocked(MachineHealth::kOk, location);
   }
   void Reset(
-      const std::source_location& location = std::source_location::current()) {
-    UpdateHealth(MachineHealth::kOk, location);
+      const std::source_location& location = std::source_location::current())
+      ABSL_LOCKS_EXCLUDED(state_mutex_) {
+    absl::MutexLock lock(state_mutex_);
+    UpdateHealthLocked(MachineHealth::kOk, location);
     timestamp_last_event_ = absl::UnixEpoch();
     timestamp_last_health_transition_ = absl::UnixEpoch();
     event_last_health_transition_ = milotic::Event();
   }
-  int64_t LastEventId() const { return last_event_id_; }
-  void SetLastEventId(int64_t event_id) { last_event_id_ = event_id; }
-  nlohmann::json LastHealthChangeDetails() const;
-  const milotic::Event& LastHealthChangeEvent() const {
+  int64_t LastEventId() const ABSL_LOCKS_EXCLUDED(state_mutex_) {
+    absl::MutexLock lock(state_mutex_);
+    return last_event_id_;
+  }
+  void SetLastEventId(int64_t event_id) ABSL_LOCKS_EXCLUDED(state_mutex_) {
+    absl::MutexLock lock(state_mutex_);
+    last_event_id_ = event_id;
+  }
+  nlohmann::json LastHealthChangeDetails() const
+      ABSL_LOCKS_EXCLUDED(state_mutex_);
+  milotic::Event LastHealthChangeEvent() const
+      ABSL_LOCKS_EXCLUDED(state_mutex_) {
+    absl::MutexLock lock(state_mutex_);
     return event_last_health_transition_;
   }
 
   virtual ~BaseMachineState() = default;
 
  protected:
-  void UpdateHealth(MachineHealth health, const std::source_location& location =
-                                              std::source_location::current()) {
+  void UpdateHealthLocked(
+      MachineHealth health,
+      const std::source_location& location = std::source_location::current())
+      ABSL_EXCLUSIVE_LOCKS_REQUIRED(state_mutex_) {
     LOG(INFO).AtLocation(location.file_name(), static_cast<int>(location.line()))
         << "MachineState updated to " << MachineHealthToString(health);
     CommonMetrics::Get().events_manager_health.SetState(
@@ -137,11 +161,12 @@
     health_ = health;
   }
 
-  MachineHealth health_;
-  absl::Time timestamp_last_event_;
-  absl::Time timestamp_last_health_transition_;
-  milotic::Event event_last_health_transition_;
-  int64_t last_event_id_ = 0;
+  mutable absl::Mutex state_mutex_;
+  MachineHealth health_ ABSL_GUARDED_BY(state_mutex_);
+  absl::Time timestamp_last_event_ ABSL_GUARDED_BY(state_mutex_);
+  absl::Time timestamp_last_health_transition_ ABSL_GUARDED_BY(state_mutex_);
+  milotic::Event event_last_health_transition_ ABSL_GUARDED_BY(state_mutex_);
+  int64_t last_event_id_ ABSL_GUARDED_BY(state_mutex_) = 0;
 };
 
 // Handles redfish events in machines which provides a SSE connection to BMC.