Refactor address lookup to use a future, and logic checking if the future is ready or not.
At no point during the execution does the logic perform a blocking operation.
PiperOrigin-RevId: 831064173
Change-Id: I9b295f113977d723afafce5d25893ed1a3d231ab
diff --git a/bmc/address_lookup.cc b/bmc/address_lookup.cc
index 47d0ba1..9e24b48 100644
--- a/bmc/address_lookup.cc
+++ b/bmc/address_lookup.cc
@@ -6,7 +6,10 @@
#include <cerrno>
#include <cstdint>
#include <cstring>
-#include <future> // NOLINT(build/c++11) Runs on BMC.
+// NOLINTBEGIN(readability/boost) Runs on BMC.
+#include <chrono> // NOLINT(build/c++11)
+#include <future> // NOLINT(build/c++11)
+// NOLINTEND(readability/boost)
#include <memory>
#include <string>
#include <utility>
@@ -19,9 +22,12 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
-// NOLINTBEGIN(readability/boost) Runs on BMC.
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
+// NOLINTBEGIN(readability/boost) Runs on BMC.
+#include "boost/asio/io_context.hpp"
+#include "boost/asio/io_service.hpp"
+#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/process.hpp"
#include "boost/system.hpp"
#include "bmc/status_macros.h"
@@ -29,6 +35,9 @@
namespace safepower_agent {
+constexpr std::chrono::milliseconds kIPCommandTimeout
+ = std::chrono::milliseconds(100);
+
static absl::Status SetLow64(in6_addr& addr,
absl::string_view node_entity_tag) {
const production_msv::node_entities_proto::ResolvedNodeEntityConfiguration&
@@ -117,12 +126,51 @@
LOG(DFATAL) << "Failed to find null terminator in address string: "
<< result;
return absl::InternalError(absl::StrCat(
- "Failed to find null terminator in address string: ", result));
+ "Failed to find null terminator in address string: ", result));
}
result.erase(len);
return result;
}
+void AwaitFuture(std::unique_ptr<boost::asio::steady_timer> timer,
+ std::unique_ptr<std::future<std::string>> stdout_future,
+ absl::AnyInvocable<void(absl::StatusOr<std::string>) &&>
+ callback) {
+ auto stdout_result = stdout_future->wait_for(std::chrono::milliseconds(0));
+ std::string stdout;
+ if (stdout_result == std::future_status::ready) {
+ // NOLINTNEXTLINE(misc-include-cleaner)
+ try { // future.get() will throw an exception, an exception
+ // if an exception was saved in the promise.
+ stdout = std::move(*stdout_future).get();
+ // NOLINTNEXTLINE(misc-include-cleaner)
+ } catch (...) {
+ LOG(ERROR) << "Failed to wait for future: ";
+ std::move(callback)(absl::UnavailableError(
+ ("Failed to wait for future: ")));
+ return;
+ }
+ std::move(callback)(stdout);
+ return;
+ }
+ timer->expires_at(boost::asio::chrono::steady_clock::now() +
+ kIPCommandTimeout);
+ timer->async_wait([timer = std::move(timer),
+ stdout_future = std::move(stdout_future),
+ callback = std::move(callback)](
+ const boost::system::error_code& ec) mutable {
+ // error handling
+ if (ec) {
+ LOG(ERROR) << "Timer wait failed: " << ec.message();
+ std::move(callback)(absl::UnavailableError(
+ absl::StrCat("Timer wait failed: ", ec.message())));
+ return;
+ }
+ AwaitFuture(std::move(timer), std::move(stdout_future),
+ std::move(callback));
+ });
+}
+
void LookupAddress(
std::string node_entity_tag,
absl::AnyInvocable<void(absl::StatusOr<std::string>) &&> callback) {
@@ -135,30 +183,40 @@
// Default args for ip command.
args = {"-6", "addr", "show", "scope", "global", "-deprecated"};
}
-
- // Use a streambuf to capture stdout asynchronously.
- // TODO: b/448994383 - Fix load address blocking io context
- auto stdout_buffer = std::make_shared<boost::asio::streambuf>();
-
+ auto stdout_future = std::make_unique<std::future<std::string>>();
+ auto& stdout_future_ref = *stdout_future;
+ auto timer = std::make_unique<boost::asio::steady_timer>(
+ daemon_context.get_io_context());
+ // It is possible for the async_system to run the handler before the
+ // future is ready. When this happens, we wait async until the future is
+ // ready.
boost::process::async_system(
daemon_context.get_io_context(),
- [node_entity_tag = std::move(node_entity_tag),
- callback = std::move(callback), stdout_buffer](
- boost::system::error_code ec, int exit_code) mutable {
- // callback runs after output is copied to stdout_str.
- std::string stdout_str;
- const char* data =
- boost::asio::buffer_cast<const char*>(stdout_buffer->data());
- stdout_str.assign(data, stdout_buffer->size());
-
- // Now call ProcessAddressLookupResult with the captured stdout.
- std::move(callback)(ProcessAddressLookupResult(
- node_entity_tag, ec, exit_code, std::move(stdout_str)));
+ [timer = std::move(timer), node_entity_tag,
+ callback = std::move(callback),
+ stdout_future = std::move(stdout_future)](boost::system::error_code ec,
+ int exit_code) mutable {
+ if (ec.failed()) {
+ LOG(ERROR) << "Failed to run ip command: " << ec.message();
+ std::move(callback)(absl::FailedPreconditionError(
+ absl::StrCat("Failed to run ip command: ", ec.message())));
+ return;
+ }
+ AwaitFuture(std::move(timer),
+ std::move(stdout_future),
+ [callback = std::move(callback), node_entity_tag, exit_code]
+ (absl::StatusOr<std::string> stdout) mutable {
+ if (stdout.ok()){
+ std::move(callback)(ProcessAddressLookupResult(
+ node_entity_tag, {}, exit_code, *std::move(stdout)));
+ } else {
+ std::move(callback)(stdout.status());
+ }
+ });
},
boost::process::exe = address_lookup_config.command(),
- boost::process::args = args,
- boost::process::std_out > *stdout_buffer // Pipe output to streambuf
- );
+ boost::process::args = args, boost::process::std_out > stdout_future_ref);
}
+
} // namespace safepower_agent