blob: 7a7a447642bc3993faa63a8b74a2a62659a88c39 [file] [log] [blame] [edit]
#include "channel_server.hpp"
#include <cstddef>
#include <cstdlib>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>
#include <boost/asio.hpp>
#include "absl/log/log.h"
#include "absl/strings/str_split.h"
#include "absl/synchronization/mutex.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/server_callback.h"
#include "grpcpp/support/status.h"
using tcp = boost::asio::ip::tcp;
using google::internal::platforms::remotedebug::v1::ChannelRequest;
using google::internal::platforms::remotedebug::v1::ChannelResponse;
constexpr int kBufferSize = 10000;
RemoteDebugServiceImpl::RemoteDebugServiceImpl(
boost::asio::io_context& io_context)
: io_context_(io_context) {}
grpc::ServerBidiReactor<ChannelRequest, ChannelResponse>*
RemoteDebugServiceImpl::Channel(
[[maybe_unused]] grpc::CallbackServerContext* context) {
auto reactor = std::make_shared<Reactor>(*this, io_context_);
auto reactor_ptr = reactor.get();
absl::MutexLock lock(reactors_mutex_);
reactors_.push_back(std::move(reactor));
return reactor_ptr;
}
void RemoteDebugServiceImpl::RemoveReactor(Reactor* reactor_ptr) {
if (!reactor_ptr) return;
absl::MutexLock lock(reactors_mutex_);
// Remove the reactor from the vector.
std::erase_if(reactors_, [reactor_ptr](const auto& reactor) {
return reactor.get() == reactor_ptr;
});
}
RemoteDebugServiceImpl::Reactor::Reactor(RemoteDebugServiceImpl& owner,
boost::asio::io_context& io_context)
: io_context_(io_context),
socket_(io_context),
buffer_(kBufferSize),
owner_(owner) {
LOG(INFO) << "Creating reactor";
StartRead(&received_request_);
}
RemoteDebugServiceImpl::Reactor::~Reactor() { LOG(INFO) << "Deleting reactor"; }
void RemoteDebugServiceImpl::Reactor::OnReadDone(bool ok) {
if (!ok) {
// On gRPC:
// "A false ok could mean a failed RPC (e.g. cancellation) or a case
// where no data is possible because the client has finished its writes."
// Therefore, to avoid sending failed data to the client, let's finish
// the server reactor with no errors returned to the grpc client.
LOG(INFO) << "No more reads from gRPC client. Finishing reactor.";
MaybeFinish(grpc::Status::OK);
return;
}
if (received_request_.has_connect() && !is_connected_) {
auto connect = received_request_.connect();
std::vector<std::string> target_address =
absl::StrSplit(connect.target_address(), ':');
if (target_address.size() != 2) {
LOG(ERROR) << "Invalid target address: " << connect.target_address();
MaybeFinish(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Invalid target address."));
return;
}
if (!ConnectToTarget(target_address[0], target_address[1])) {
MaybeFinish(grpc::Status(grpc::StatusCode::INTERNAL,
"Failed to connect to target."));
return;
}
StartTcpRead();
} else if (received_request_.has_data() && is_connected_) {
boost::system::error_code ec;
socket_.write_some(boost::asio::buffer(received_request_.data().data()),
ec);
if (ec) {
LOG(ERROR) << "Failed to write to socket: " << ec.message();
MaybeFinish(grpc::Status(grpc::StatusCode::INTERNAL,
"Failed to write to socket."));
return;
}
} else {
LOG(ERROR) << "Received invalid request from grpc client.";
MaybeFinish(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Received invalid request from grpc client."));
return;
}
StartRead(&received_request_);
}
void RemoteDebugServiceImpl::Reactor::OnWriteDone(bool ok) {
if (!ok) {
LOG(ERROR) << "Failed to write to grpc client.";
MaybeFinish(grpc::Status(grpc::StatusCode::INTERNAL,
"Failed to write to grpc client."));
}
// Inform reactor that grpc write is completed. This is to synchronize the
// start of write N+1 with the finish of write N.
grpc_write_mutex_.lock();
is_grpc_write_completed_ = true;
grpc_write_mutex_.unlock();
}
void RemoteDebugServiceImpl::Reactor::OnDone() {
owner_.RemoveReactor(this);
}
void RemoteDebugServiceImpl::Reactor::MaybeStartWrite(
ChannelResponse* response) {
// Block until the write is allowed. This is to ensure we don't start
// multiple grpc writes without waiting for the previous write to
// finish.
grpc_write_mutex_.LockWhen(absl::Condition(&is_grpc_write_completed_));
absl::MutexLock lock(finished_mutex_);
if (has_finished_) {
grpc_write_mutex_.unlock();
return;
}
StartWrite(response);
is_grpc_write_completed_ = false;
grpc_write_mutex_.unlock();
}
bool RemoteDebugServiceImpl::Reactor::ConnectToTarget(std::string address,
std::string port) {
tcp::resolver resolver(io_context_);
boost::system::error_code ec;
boost::asio::ip::tcp::resolver::results_type endpoints;
endpoints = resolver.resolve(address, port, ec);
if (ec) {
LOG(ERROR) << "Failed to resolve target: " << ec.message();
return false;
}
if (endpoints.empty()) {
LOG(ERROR) << "No endpoint found for target: " << address << ":" << port;
return false;
}
// Try out all available endpoints that are returned from the resolver.
boost::asio::connect(socket_, endpoints, ec);
if (ec) {
LOG(ERROR) << "Failed to connect to target: " << ec.message();
return false;
}
is_connected_ = true;
return true;
}
void RemoteDebugServiceImpl::Reactor::MaybeFinish(grpc::Status status) {
// Call once to ensure that the Finish() function is only called once.
std::call_once(finished_flag_, [this, status]() {
// Closing the socket will cause async reads to be cancelled and therefore
// the StartTcpRead() function will not be called again.
if (is_connected_) {
boost::system::error_code ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (!ec) socket_.close();
}
Finish(status);
absl::MutexLock lock(finished_mutex_);
// Flag to indicate that the reactor has finished. This is used to
// synchronize other threads.
has_finished_ = true;
});
}
void RemoteDebugServiceImpl::Reactor::StartTcpRead() {
// Asynchronous reads from the socket were chosen over synchronous reads
// due to:
// 1. Synchronous read would need to be triggered by a thread running parallel
// to the gRPC reactor threads.
// 2. Whenever the gRPC client terminates the connection, the gRPC server
// needs to "delete" the Reactor object on the OnDone callback which would
// need to wait until the thread running the synchronous reads finish. This
// thread would be blocked due to the nature of the synchronous read . This
// would cause a deadlock.
// 3. If the gRPC server decides not to wait for the thread to finish before
// deleting the Reactor, it could potentially cause memory leaks and crashes.
socket_.async_read_some(
boost::asio::buffer(buffer_.data(), buffer_.size()),
[this, self = shared_from_this()](boost::system::error_code ec,
size_t bytes_read) {
if (ec) {
auto read_status = grpc::Status(grpc::StatusCode::INTERNAL,
"Failed to read from socket.");
if (ec == boost::asio::error::eof) {
// EOF is received when the TCP server closes the connection.
// The Reactor needs to be closed, otherwise the stream will remain
// open until the client closes it.
read_status = grpc::Status::OK;
}
LOG(INFO) << "Finished reading from socket with error: "
<< ec.message();
MaybeFinish(read_status);
return;
}
ChannelResponse response;
response.mutable_data()->set_data(buffer_.data(), bytes_read);
MaybeStartWrite(&response);
absl::MutexLock lock(finished_mutex_);
if (has_finished_) {
return;
}
StartTcpRead();
});
}