blob: 8e4b763f20573153416cccae85e2281c972db006 [file] [log] [blame]
#include "sse_plugin/collector_storage_manager.h"
#include <vector>
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "redfish_query_engine/http/codes.h"
#include "sse_plugin/event.h"
#include "events.pb.h"
#include "voyager/voyager_telemetry.pb.h"
namespace milotic {
using ecclesia::HttpResponseCode::HTTP_CODE_REQUEST_OK;
using third_party_voyager::DataPoint;
using third_party_voyager::Update;
absl::StatusOr<std::vector<Event>>
RemoteCollectorStorageManager::RetrieveProcessedEvents() {
// wait for list of existing events and subscribe call from Borg
absl::MutexLock lock(&mutex_);
int current_reset_count = reset_count_;
AwaitProcessedEvents();
AwaitEventHandler();
if (reset_count_ != current_reset_count)
return absl::UnavailableError("storage manager is reset");
return events_;
}
absl::Status RemoteCollectorStorageManager::SaveEvent(const Event &event) {
absl::MutexLock lock(&mutex_);
if (!CheckEventHandler()) {
return absl::UnavailableError("event handler is not available");
}
auto event_proto = event.ToProto();
Update update_msg;
update_msg.set_code(HTTP_CODE_REQUEST_OK);
DataPoint *dp = update_msg.add_data_points();
dp->mutable_native()->PackFrom(event_proto);
return event_handler_->OnEvent(update_msg);
}
absl::Status RemoteCollectorStorageManager::SaveCperEvent(const Event &event) {
absl::MutexLock lock(&mutex_);
if (!CheckEventHandler()) {
return absl::UnavailableError("event handler is not available");
}
platforms_vbmc::CperEvent cper_event;
cper_event.set_event_json(event.event_json());
platforms_vbmc::OtsUpdate ots_update;
*ots_update.mutable_cper_event() = cper_event;
Update update_msg;
update_msg.set_code(HTTP_CODE_REQUEST_OK);
DataPoint *dp = update_msg.add_data_points();
dp->mutable_native()->PackFrom(ots_update);
return event_handler_->OnEvent(update_msg);
}
absl::Status RemoteCollectorStorageManager::SaveLogs(
const Event &event, absl::string_view contents) {
absl::MutexLock lock(&mutex_);
if (!CheckEventHandler()) {
return absl::UnavailableError("event handler is not available");
}
Update update_msg;
update_msg.set_code(HTTP_CODE_REQUEST_OK);
platforms_vbmc::OtsUpdate ots_update;
ots_update.mutable_log()->set_log_message(contents);
ots_update.mutable_log()->set_event_id(absl::StrCat(event.event_id()));
DataPoint *dp = update_msg.add_data_points();
dp->mutable_native()->PackFrom(ots_update);
return event_handler_->OnEvent(update_msg);
}
absl::Status RemoteCollectorStorageManager::ArchiveProcessedEvents() {
absl::MutexLock lock(&mutex_);
if (!CheckEventHandler()) {
return absl::UnavailableError("event handler is not available");
}
events_.clear();
Update update_msg;
update_msg.set_code(HTTP_CODE_REQUEST_OK);
platforms_vbmc::OtsUpdate ots_update;
ots_update.set_ots_health_ok(true);
DataPoint *dp = update_msg.add_data_points();
dp->mutable_native()->PackFrom(ots_update);
return event_handler_->OnEvent(update_msg);
}
absl::Status RemoteCollectorStorageManager::SendHeartbeat() {
absl::MutexLock lock(&mutex_);
if (!CheckEventHandler()) {
return absl::UnavailableError("event handler is not available");
}
Update update_msg;
update_msg.set_code(HTTP_CODE_REQUEST_OK);
platforms_vbmc::OtsUpdate ots_update;
DataPoint *dp = update_msg.add_data_points();
dp->mutable_native()->PackFrom(ots_update);
return event_handler_->OnEvent(update_msg);
}
RemoteCollectorStorageManager::~RemoteCollectorStorageManager() {
LOG(INFO) << "In ~RemoteCollectorStorageManager";
// disconnect from Borg SSE connection
}
} // namespace milotic