| #include "channel_server.hpp" |
| |
| #include <cstddef> |
| #include <cstdlib> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/asio.hpp> |
| #include "absl/log/log.h" |
| #include "absl/strings/str_split.h" |
| #include "absl/synchronization/mutex.h" |
| #include "grpcpp/server_context.h" |
| #include "grpcpp/support/server_callback.h" |
| #include "grpcpp/support/status.h" |
| |
| using tcp = boost::asio::ip::tcp; |
| using google::internal::platforms::remotedebug::v1::ChannelRequest; |
| using google::internal::platforms::remotedebug::v1::ChannelResponse; |
| |
| constexpr int kBufferSize = 10000; |
| |
| RemoteDebugServiceImpl::RemoteDebugServiceImpl( |
| boost::asio::io_context& io_context) |
| : io_context_(io_context) {} |
| |
| grpc::ServerBidiReactor<ChannelRequest, ChannelResponse>* |
| RemoteDebugServiceImpl::Channel( |
| [[maybe_unused]] grpc::CallbackServerContext* context) { |
| auto reactor = std::make_shared<Reactor>(*this, io_context_); |
| auto reactor_ptr = reactor.get(); |
| absl::MutexLock lock(reactors_mutex_); |
| reactors_.push_back(std::move(reactor)); |
| return reactor_ptr; |
| } |
| |
| void RemoteDebugServiceImpl::RemoveReactor(Reactor* reactor_ptr) { |
| if (!reactor_ptr) return; |
| absl::MutexLock lock(reactors_mutex_); |
| // Remove the reactor from the vector. |
| std::erase_if(reactors_, [reactor_ptr](const auto& reactor) { |
| return reactor.get() == reactor_ptr; |
| }); |
| } |
| |
| RemoteDebugServiceImpl::Reactor::Reactor(RemoteDebugServiceImpl& owner, |
| boost::asio::io_context& io_context) |
| : io_context_(io_context), |
| socket_(io_context), |
| buffer_(kBufferSize), |
| owner_(owner) { |
| LOG(INFO) << "Creating reactor"; |
| StartRead(&received_request_); |
| } |
| |
| RemoteDebugServiceImpl::Reactor::~Reactor() { LOG(INFO) << "Deleting reactor"; } |
| |
| void RemoteDebugServiceImpl::Reactor::OnReadDone(bool ok) { |
| if (!ok) { |
| // On gRPC: |
| // "A false ok could mean a failed RPC (e.g. cancellation) or a case |
| // where no data is possible because the client has finished its writes." |
| // Therefore, to avoid sending failed data to the client, let's finish |
| // the server reactor with no errors returned to the grpc client. |
| LOG(INFO) << "No more reads from gRPC client. Finishing reactor."; |
| MaybeFinish(grpc::Status::OK); |
| return; |
| } |
| if (received_request_.has_connect() && !is_connected_) { |
| auto connect = received_request_.connect(); |
| std::vector<std::string> target_address = |
| absl::StrSplit(connect.target_address(), ':'); |
| if (target_address.size() != 2) { |
| LOG(ERROR) << "Invalid target address: " << connect.target_address(); |
| MaybeFinish(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, |
| "Invalid target address.")); |
| return; |
| } |
| if (!ConnectToTarget(target_address[0], target_address[1])) { |
| MaybeFinish(grpc::Status(grpc::StatusCode::INTERNAL, |
| "Failed to connect to target.")); |
| return; |
| } |
| StartTcpRead(); |
| } else if (received_request_.has_data() && is_connected_) { |
| boost::system::error_code ec; |
| socket_.write_some(boost::asio::buffer(received_request_.data().data()), |
| ec); |
| if (ec) { |
| LOG(ERROR) << "Failed to write to socket: " << ec.message(); |
| MaybeFinish(grpc::Status(grpc::StatusCode::INTERNAL, |
| "Failed to write to socket.")); |
| return; |
| } |
| } else { |
| LOG(ERROR) << "Received invalid request from grpc client."; |
| MaybeFinish(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, |
| "Received invalid request from grpc client.")); |
| return; |
| } |
| StartRead(&received_request_); |
| } |
| |
| void RemoteDebugServiceImpl::Reactor::OnWriteDone(bool ok) { |
| if (!ok) { |
| LOG(ERROR) << "Failed to write to grpc client."; |
| MaybeFinish(grpc::Status(grpc::StatusCode::INTERNAL, |
| "Failed to write to grpc client.")); |
| } |
| // Inform reactor that grpc write is completed. This is to synchronize the |
| // start of write N+1 with the finish of write N. |
| grpc_write_mutex_.lock(); |
| is_grpc_write_completed_ = true; |
| grpc_write_mutex_.unlock(); |
| } |
| |
| void RemoteDebugServiceImpl::Reactor::OnDone() { |
| owner_.RemoveReactor(this); |
| } |
| |
| void RemoteDebugServiceImpl::Reactor::MaybeStartWrite( |
| ChannelResponse* response) { |
| // Block until the write is allowed. This is to ensure we don't start |
| // multiple grpc writes without waiting for the previous write to |
| // finish. |
| grpc_write_mutex_.LockWhen(absl::Condition(&is_grpc_write_completed_)); |
| absl::MutexLock lock(finished_mutex_); |
| if (has_finished_) { |
| grpc_write_mutex_.unlock(); |
| return; |
| } |
| StartWrite(response); |
| is_grpc_write_completed_ = false; |
| grpc_write_mutex_.unlock(); |
| } |
| |
| bool RemoteDebugServiceImpl::Reactor::ConnectToTarget(std::string address, |
| std::string port) { |
| tcp::resolver resolver(io_context_); |
| boost::system::error_code ec; |
| boost::asio::ip::tcp::resolver::results_type endpoints; |
| endpoints = resolver.resolve(address, port, ec); |
| if (ec) { |
| LOG(ERROR) << "Failed to resolve target: " << ec.message(); |
| return false; |
| } |
| if (endpoints.empty()) { |
| LOG(ERROR) << "No endpoint found for target: " << address << ":" << port; |
| return false; |
| } |
| // Try out all available endpoints that are returned from the resolver. |
| boost::asio::connect(socket_, endpoints, ec); |
| if (ec) { |
| LOG(ERROR) << "Failed to connect to target: " << ec.message(); |
| return false; |
| } |
| is_connected_ = true; |
| return true; |
| } |
| |
| void RemoteDebugServiceImpl::Reactor::MaybeFinish(grpc::Status status) { |
| // Call once to ensure that the Finish() function is only called once. |
| std::call_once(finished_flag_, [this, status]() { |
| // Closing the socket will cause async reads to be cancelled and therefore |
| // the StartTcpRead() function will not be called again. |
| if (is_connected_) { |
| boost::system::error_code ec; |
| socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); |
| if (!ec) socket_.close(); |
| } |
| Finish(status); |
| absl::MutexLock lock(finished_mutex_); |
| // Flag to indicate that the reactor has finished. This is used to |
| // synchronize other threads. |
| has_finished_ = true; |
| }); |
| } |
| |
| void RemoteDebugServiceImpl::Reactor::StartTcpRead() { |
| // Asynchronous reads from the socket were chosen over synchronous reads |
| // due to: |
| // 1. Synchronous read would need to be triggered by a thread running parallel |
| // to the gRPC reactor threads. |
| // 2. Whenever the gRPC client terminates the connection, the gRPC server |
| // needs to "delete" the Reactor object on the OnDone callback which would |
| // need to wait until the thread running the synchronous reads finish. This |
| // thread would be blocked due to the nature of the synchronous read . This |
| // would cause a deadlock. |
| // 3. If the gRPC server decides not to wait for the thread to finish before |
| // deleting the Reactor, it could potentially cause memory leaks and crashes. |
| socket_.async_read_some( |
| boost::asio::buffer(buffer_.data(), buffer_.size()), |
| [this, self = shared_from_this()](boost::system::error_code ec, |
| size_t bytes_read) { |
| if (ec) { |
| auto read_status = grpc::Status(grpc::StatusCode::INTERNAL, |
| "Failed to read from socket."); |
| if (ec == boost::asio::error::eof) { |
| // EOF is received when the TCP server closes the connection. |
| // The Reactor needs to be closed, otherwise the stream will remain |
| // open until the client closes it. |
| read_status = grpc::Status::OK; |
| } |
| LOG(INFO) << "Finished reading from socket with error: " |
| << ec.message(); |
| MaybeFinish(read_status); |
| return; |
| } |
| ChannelResponse response; |
| response.mutable_data()->set_data(buffer_.data(), bytes_read); |
| MaybeStartWrite(&response); |
| absl::MutexLock lock(finished_mutex_); |
| if (has_finished_) { |
| return; |
| } |
| StartTcpRead(); |
| }); |
| } |