| #include "sse_plugin/collector_storage_manager.h" |
| |
| #include <vector> |
| |
| #include "absl/log/log.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/synchronization/mutex.h" |
| #include "redfish_query_engine/http/codes.h" |
| #include "sse_plugin/event.h" |
| #include "events.pb.h" |
| #include "voyager/voyager_telemetry.pb.h" |
| |
| namespace milotic { |
| |
| using ecclesia::HttpResponseCode::HTTP_CODE_REQUEST_OK; |
| using third_party_voyager::DataPoint; |
| using third_party_voyager::Update; |
| |
| absl::StatusOr<std::vector<Event>> |
| RemoteCollectorStorageManager::RetrieveProcessedEvents() { |
| // wait for list of existing events and subscribe call from Borg |
| absl::MutexLock lock(&mutex_); |
| int current_reset_count = reset_count_; |
| AwaitProcessedEvents(); |
| AwaitEventHandler(); |
| |
| if (reset_count_ != current_reset_count) |
| return absl::UnavailableError("storage manager is reset"); |
| return events_; |
| } |
| |
| absl::Status RemoteCollectorStorageManager::SaveEvent(const Event &event) { |
| absl::MutexLock lock(&mutex_); |
| if (!CheckEventHandler()) { |
| return absl::UnavailableError("event handler is not available"); |
| } |
| auto event_proto = event.ToProto(); |
| Update update_msg; |
| |
| update_msg.set_code(HTTP_CODE_REQUEST_OK); |
| DataPoint *dp = update_msg.add_data_points(); |
| dp->mutable_native()->PackFrom(event_proto); |
| return event_handler_->OnEvent(update_msg); |
| } |
| |
| absl::Status RemoteCollectorStorageManager::SaveCperEvent(const Event &event) { |
| absl::MutexLock lock(&mutex_); |
| if (!CheckEventHandler()) { |
| return absl::UnavailableError("event handler is not available"); |
| } |
| |
| platforms_vbmc::CperEvent cper_event; |
| cper_event.set_event_json(event.event_json()); |
| platforms_vbmc::OtsUpdate ots_update; |
| *ots_update.mutable_cper_event() = cper_event; |
| Update update_msg; |
| update_msg.set_code(HTTP_CODE_REQUEST_OK); |
| DataPoint *dp = update_msg.add_data_points(); |
| dp->mutable_native()->PackFrom(ots_update); |
| |
| return event_handler_->OnEvent(update_msg); |
| } |
| |
| absl::Status RemoteCollectorStorageManager::SaveLogs( |
| const Event &event, absl::string_view contents) { |
| absl::MutexLock lock(&mutex_); |
| if (!CheckEventHandler()) { |
| return absl::UnavailableError("event handler is not available"); |
| } |
| Update update_msg; |
| update_msg.set_code(HTTP_CODE_REQUEST_OK); |
| |
| platforms_vbmc::OtsUpdate ots_update; |
| ots_update.mutable_log()->set_log_message(contents); |
| ots_update.mutable_log()->set_event_id(absl::StrCat(event.event_id())); |
| |
| DataPoint *dp = update_msg.add_data_points(); |
| dp->mutable_native()->PackFrom(ots_update); |
| return event_handler_->OnEvent(update_msg); |
| } |
| |
| absl::Status RemoteCollectorStorageManager::ArchiveProcessedEvents() { |
| absl::MutexLock lock(&mutex_); |
| if (!CheckEventHandler()) { |
| return absl::UnavailableError("event handler is not available"); |
| } |
| events_.clear(); |
| Update update_msg; |
| update_msg.set_code(HTTP_CODE_REQUEST_OK); |
| |
| platforms_vbmc::OtsUpdate ots_update; |
| ots_update.set_ots_health_ok(true); |
| |
| DataPoint *dp = update_msg.add_data_points(); |
| dp->mutable_native()->PackFrom(ots_update); |
| return event_handler_->OnEvent(update_msg); |
| } |
| |
| absl::Status RemoteCollectorStorageManager::SendHeartbeat() { |
| absl::MutexLock lock(&mutex_); |
| if (!CheckEventHandler()) { |
| return absl::UnavailableError("event handler is not available"); |
| } |
| Update update_msg; |
| update_msg.set_code(HTTP_CODE_REQUEST_OK); |
| |
| platforms_vbmc::OtsUpdate ots_update; |
| |
| DataPoint *dp = update_msg.add_data_points(); |
| dp->mutable_native()->PackFrom(ots_update); |
| return event_handler_->OnEvent(update_msg); |
| } |
| |
| RemoteCollectorStorageManager::~RemoteCollectorStorageManager() { |
| LOG(INFO) << "In ~RemoteCollectorStorageManager"; |
| // disconnect from Borg SSE connection |
| } |
| |
| } // namespace milotic |