blob: 2087defaa9ddc03bbd6f5d9f771e4b21c984f0cc [file] [log] [blame]
#include "proxy_voyager_impl.h"
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include "gmock.h"
#include "gunit.h"
#include "absl/log/globals.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "redfish_query_engine/protobuf/parse.h"
#include "grpcpp/channel.h"
#include "grpcpp/client_context.h"
#include "grpcpp/server_builder.h"
#include "grpcpp/support/channel_arguments.h"
#include "grpcpp/support/status.h"
#include "grpcpp/support/sync_stream.h"
#include "deferred_status.h"
#include "mock_redfish_plugin.h"
#include "proxy.h"
#include "proxy_config.pb.h"
#include "redfish_plugin.h"
#include "request_response.h"
#include "sse/sse_parser.h"
#include "voyager/deferrable_priority_queue.hpp"
#include "voyager/voyager_telemetry.grpc.pb.h"
#include "voyager/voyager_telemetry.pb.h"
#include "util/task/status_macros.h"
// TODO(b/290052834): Move some test coverage from redfish_test.cc, add more
// tests.
namespace {
using ::testing::_;
using ::testing::AllOf;
using ::testing::AnyOf;
using ::testing::Contains;
using ::testing::EqualsProto;
using ::testing::Field;
using ::testing::Not;
using ::testing::Pair;
using ::testing::Pointee;
using ::testing::Return;
using ::testing::status::IsOk;
using ::testing::status::StatusIs;
using ::milotic::Proxy;
using ::milotic::ProxyRequest;
using ::milotic::ProxyResponse;
using ::milotic::RedfishPlugin;
using MockPlugin = milotic::MockRedfishPlugin;
struct TestServer {
std::unique_ptr<Proxy> proxy;
MockPlugin& plugin;
std::shared_ptr<grpc::Channel> channel;
std::unique_ptr<third_party_voyager::MachineTelemetry::Stub> stub;
};
absl::StatusOr<TestServer> CreateTestServer(
absl::string_view endpoint, voyager::DeferrablePriorityQueue* queue,
milotic_grpc_proxy::AuthorizationPolicy authorization_policy = ecclesia::
ParseTextAsProtoOrDie<milotic_grpc_proxy::AuthorizationPolicy>(R"pb(
mappings: {
name: "match_all"
allow: { key: "all" }
resource_path: ""
with_subtree: true
})pb")) {
absl::SetVLogLevel("proxy*", 1);
std::unique_ptr<RedfishPlugin> plugins[] = {std::make_unique<MockPlugin>()};
auto& plugin = *static_cast<MockPlugin*>(plugins[0].get());
EXPECT_CALL(plugin, Initialize).WillOnce(Return(absl::OkStatus()));
TestServer test_server = {.proxy = std::make_unique<Proxy>(
endpoint, plugins, queue, Proxy::Resources(),
std::move(authorization_policy)),
.plugin = plugin};
RETURN_IF_ERROR(test_server.proxy->AddService(
milotic_grpc_proxy::VoyagerTelemetryOptions::default_instance()));
RETURN_IF_ERROR(test_server.proxy->ConfigGrpcAndStart(
milotic_grpc_proxy::GrpcConfiguration(),
[](const std::string&) { return true; }));
test_server.channel = test_server.proxy->grpc_server()->InProcessChannel(
grpc::ChannelArguments());
test_server.stub =
third_party_voyager::MachineTelemetry::NewStub(test_server.channel);
return test_server;
}
TEST(ProxyVoyagerImplTest, RequestHttpConversionWorks) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(
test_server.plugin,
HandleRequest(
RedfishPlugin::RequestVerb::kGet,
Pointee(AllOf(
Field("uri", &ProxyRequest::uri, "test_endpoint/some/resource"),
Field("headers", &ProxyRequest::headers,
Contains(Pair("X-Test-Header", "test_value")))))))
.WillOnce(
Return(ProxyResponse{{.code = 200,
.body = R"json({"Name": "test json"})json",
.headers = {{"Content-Type", "text/json"}}}}));
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
http_headers: { key: "X-Test-Header" value: "test_value" }
)pb");
grpc::ClientContext context;
third_party_voyager::Update response;
milotic::DeferredStatus status;
test_server.stub->async()->Get(&context, &request, &response,
status.Setter());
ASSERT_OK(queue.ProcessQueue(1));
EXPECT_OK(status.AwaitStatus());
EXPECT_THAT(response, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/json" }
data_points {
res_fqp { specifier: "/some/resource" }
json: '{"Name": "test json"}'
}
)pb"));
}
TEST(ProxyVoyagerImplTest, HostHeaderIsReplaced) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
http_headers: { key: "host", value: "unused_host" }
)pb");
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(
test_server.plugin,
HandleRequest(
RedfishPlugin::RequestVerb::kGet,
Pointee(AllOf(
Field("uri", &ProxyRequest::uri, "test_endpoint/some/resource"),
Field("headers", &ProxyRequest::headers,
Contains(Pair("Host", "test_endpoint")))))))
.WillOnce(
Return(ProxyResponse{{.code = 200,
.body = R"json({"Name": "test json"})json",
.headers = {{"Content-Type", "text/json"}}}}));
grpc::ClientContext context;
third_party_voyager::Update response;
milotic::DeferredStatus status;
test_server.stub->async()->Get(&context, &request, &response,
status.Setter());
ASSERT_OK(queue.ProcessQueue(1));
EXPECT_OK(status.AwaitStatus());
}
TEST(ProxyVoyagerImplTest, SetRequestHttpConversionWorks) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(
test_server.plugin,
HandleRequest(
RedfishPlugin::RequestVerb::kPost,
Pointee(AllOf(
Field("uri", &ProxyRequest::uri, "test_endpoint/some/resource"),
Field("headers", &ProxyRequest::headers,
Contains(Pair("X-Test-Header", "test_value"))),
Field("headers", &ProxyRequest::headers,
Contains(Pair("Content-Type", "application/json")))))))
.WillOnce(
Return(ProxyResponse{{.code = 200,
.body = R"json({"Name": "response"})json",
.headers = {{"Content-Type", "text/json"}}}}));
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::SetRequest>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
http_headers: { key: "X-Test-Header" value: "test_value" }
json: '{"Name": "test json"}'
)pb");
grpc::ClientContext context;
third_party_voyager::Update response;
milotic::DeferredStatus status;
test_server.stub->async()->Post(&context, &request, &response,
status.Setter());
ASSERT_OK(queue.ProcessQueue(1));
EXPECT_OK(status.AwaitStatus());
EXPECT_THAT(response, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/json" }
data_points {
res_fqp { specifier: "/some/resource" }
json: '{"Name": "response"}'
}
)pb"));
}
TEST(ProxyVoyagerImplTest, SetRequestNoRequestData) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(
test_server.plugin,
HandleRequest(
RedfishPlugin::RequestVerb::kPost,
Pointee(AllOf(
Field("uri", &ProxyRequest::uri, "test_endpoint/some/resource"),
Field("headers", &ProxyRequest::headers,
Contains(Pair("X-Test-Header", "test_value"))),
Field("headers", &ProxyRequest::headers,
Not(Contains(Pair("Content-Type", _))))))))
.WillOnce(
Return(ProxyResponse{{.code = 200,
.body = R"json({"Name": "response"})json",
.headers = {{"Content-Type", "text/json"}}}}));
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::SetRequest>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
http_headers: { key: "X-Test-Header" value: "test_value" }
)pb");
grpc::ClientContext context;
third_party_voyager::Update response;
milotic::DeferredStatus status;
test_server.stub->async()->Post(&context, &request, &response,
status.Setter());
ASSERT_OK(queue.ProcessQueue(1));
EXPECT_OK(status.AwaitStatus());
EXPECT_THAT(response, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/json" }
data_points {
res_fqp { specifier: "/some/resource" }
json: '{"Name": "response"}'
}
)pb"));
}
TEST(ProxyVoyagerImplTest, SetRequestKeyValueDataError) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::SetRequest>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
http_headers: { key: "X-Test-Header" value: "test_value" }
key_value: {
fields: {
key: "test"
value: { string_val: "value" }
}
}
)pb");
grpc::ClientContext context;
third_party_voyager::Update response;
milotic::DeferredStatus status;
test_server.stub->async()->Post(&context, &request, &response,
status.Setter());
EXPECT_EQ(status.AwaitStatus().error_code(), grpc::StatusCode::UNIMPLEMENTED);
}
TEST(ProxyVoyagerImplTest, EmptyRequestIsServed) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "empty_req"
)pb");
third_party_voyager::Update response;
grpc::ClientContext context;
milotic::DeferredStatus status;
test_server.stub->async()->Get(&context, &request, &response,
status.Setter());
EXPECT_OK(status.AwaitStatus());
EXPECT_THAT(response, EqualsProto("req_id: 'empty_req'"));
}
TEST(ProxyVoyagerImplTest, SseEventsAreServed) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(
test_server.plugin,
Subscribe(
Pointee(Field(&ProxyRequest::uri, "test_endpoint/some/resource")), _))
.WillOnce([](std::unique_ptr<ProxyRequest>,
RedfishPlugin::EventHandler* handler) {
EXPECT_OK(handler->OnResponse(ProxyResponse{
{.code = 200,
.headers = {{"Content-Type", "text/event-stream"}}}}));
EXPECT_OK(
handler->OnEvent(milotic::ServerSentEvent{.event = "test_event",
.data = "test data 1",
.id = "event1",
.retry = 20}));
EXPECT_OK(handler->OnEvent(milotic::ServerSentEvent{
.event = "test_event", .data = "test data 2", .id = "event2"}));
return absl::OkStatus();
});
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
)pb");
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReader<third_party_voyager::Update>>
subscription = test_server.stub->Subscribe(&context, request);
third_party_voyager::Update response;
EXPECT_TRUE(subscription->Read(&response));
EXPECT_THAT(response, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/event-stream" }
data_points {
res_fqp { specifier: "/some/resource" }
key_value: {
fields {
key: "event",
value: { string_val: "test_event" }
}
fields {
key: "data",
value: { string_val: "test data 1" }
}
fields {
key: "id",
value: { string_val: "event1" }
}
fields {
key: "retry",
value: { uint_val: 20 }
}
}
}
)pb"));
EXPECT_TRUE(subscription->Read(&response));
EXPECT_THAT(response, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/event-stream" }
data_points {
res_fqp { specifier: "/some/resource" }
key_value: {
fields {
key: "event",
value: { string_val: "test_event" }
}
fields {
key: "data",
value: { string_val: "test data 2" }
}
fields {
key: "id",
value: { string_val: "event2" }
}
}
}
)pb"));
EXPECT_OK(subscription->Finish());
}
TEST(ProxyVoyagerImplTest, ProtoEventsAreServed) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(
test_server.plugin,
Subscribe(
Pointee(Field(&ProxyRequest::uri, "test_endpoint/some/resource")), _))
.WillOnce([](std::unique_ptr<ProxyRequest>,
RedfishPlugin::EventHandler* handler) {
EXPECT_OK(handler->OnResponse(ProxyResponse{
{.code = 200,
.headers = {{"Content-Type", "text/event-stream"}}}}));
third_party_voyager::Update update;
update.mutable_data_points()->Add()->set_json(
R"json({"key": 1000})json");
EXPECT_OK(handler->OnEvent(update));
update.mutable_data_points(0)->set_json(R"json({"key": 2000})json");
EXPECT_OK(handler->OnEvent(update));
return absl::OkStatus();
});
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
)pb");
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReader<third_party_voyager::Update>>
subscription = test_server.stub->Subscribe(&context, request);
third_party_voyager::Update response1;
third_party_voyager::Update response2;
EXPECT_TRUE(subscription->Read(&response1));
EXPECT_TRUE(subscription->Read(&response2));
EXPECT_OK(subscription->Finish());
EXPECT_THAT(response1, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/event-stream" }
data_points {
res_fqp { specifier: "/some/resource" }
json: '{"key": 1000}'
}
)pb"));
EXPECT_THAT(response2, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/event-stream" }
data_points {
res_fqp { specifier: "/some/resource" }
json: '{"key": 2000}'
}
)pb"));
}
TEST(ProxyVoyagerImplTest, SubscriptionIsCancelled) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(
test_server.plugin,
Subscribe(
Pointee(Field(&ProxyRequest::uri, "test_endpoint/some/resource")), _))
.WillOnce([](std::unique_ptr<ProxyRequest>,
RedfishPlugin::EventHandler* handler) {
EXPECT_OK(handler->OnResponse(ProxyResponse{
{.code = 200,
.headers = {{"Content-Type", "text/event-stream"}}}}));
third_party_voyager::Update update;
update.mutable_data_points()->Add()->set_json(
R"json({"key": 1000})json");
while (!handler->IsCancelled()) {
EXPECT_THAT(handler->OnEvent(update),
AnyOf(IsOk(), StatusIs(absl::StatusCode::kCancelled)));
}
return absl::OkStatus();
});
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/resource" } }
)pb");
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReader<third_party_voyager::Update>>
subscription = test_server.stub->Subscribe(&context, request);
third_party_voyager::Update response;
EXPECT_TRUE(subscription->Read(&response));
context.TryCancel();
absl::Status finished_status = subscription->Finish();
EXPECT_THAT(response, EqualsProto(R"pb(
req_id: "test_req"
code: 200
http_headers { key: "content-type", value: "text/event-stream" }
data_points {
res_fqp { specifier: "/some/resource" }
json: '{"key": 1000}'
}
)pb"));
EXPECT_THAT(finished_status, StatusIs(absl::StatusCode::kCancelled));
}
TEST(ProxyVoyagerImplTest, EmptySubscriptionReturnsImmediately) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
ON_CALL(test_server.plugin, PreprocessRequest)
.WillByDefault(Return(RedfishPlugin::RequestAction::kHandle));
EXPECT_CALL(test_server.plugin, Subscribe).Times(0);
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "test_req"
)pb");
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReader<third_party_voyager::Update>>
subscription = test_server.stub->Subscribe(&context, request);
third_party_voyager::Update response;
EXPECT_FALSE(subscription->Read(&response));
EXPECT_OK(subscription->Finish());
}
TEST(ProxyVoyagerImplTest, RequestsArePrioritizedCorrectly) {
voyager::DeferrablePriorityQueue queue(3);
ASSERT_OK_AND_ASSIGN(TestServer test_server,
CreateTestServer("test_endpoint", &queue));
EXPECT_CALL(test_server.plugin, PreprocessRequest)
.WillRepeatedly(Return(RedfishPlugin::RequestAction::kHandle));
testing::InSequence seq;
EXPECT_CALL(test_server.plugin,
HandleRequest(RedfishPlugin::RequestVerb::kGet,
Pointee(Field(&ProxyRequest::uri,
"test_endpoint/test/high"))))
.WillOnce(Return(ProxyResponse{{.code = 200}}));
EXPECT_CALL(test_server.plugin,
HandleRequest(
RedfishPlugin::RequestVerb::kGet,
Pointee(Field(&ProxyRequest::uri, "test_endpoint/test/med"))))
.WillOnce(Return(ProxyResponse{{.code = 200}}));
EXPECT_CALL(test_server.plugin,
HandleRequest(
RedfishPlugin::RequestVerb::kGet,
Pointee(Field(&ProxyRequest::uri, "test_endpoint/test/low"))))
.WillOnce(Return(ProxyResponse{{.code = 200}}));
third_party_voyager::Update update_low;
auto request_low =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "low_prio"
req_fqp {
fqp { specifier: "/test/low" }
priority: FQP_PRI_LOW
}
)pb");
grpc::ClientContext low_context;
milotic::DeferredStatus low_status;
test_server.stub->async()->Get(&low_context, &request_low, &update_low,
low_status.Setter());
third_party_voyager::Update update_med;
auto request_med =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "med_prio"
req_fqp {
fqp { specifier: "/test/med" }
priority: FQP_PRI_MED
}
)pb");
grpc::ClientContext med_context;
milotic::DeferredStatus med_status;
test_server.stub->async()->Get(&med_context, &request_med, &update_med,
med_status.Setter());
third_party_voyager::Update update_high;
auto request_high =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "high_prio"
req_fqp {
fqp { specifier: "/test/high" }
priority: FQP_PRI_HIG
}
)pb");
grpc::ClientContext high_context;
milotic::DeferredStatus high_status;
test_server.stub->async()->Get(&high_context, &request_high, &update_high,
high_status.Setter());
// Wait for all items to be added to the queue. Even though we have called the
// RPC with each priority already, it is possible that the server code
// (running outside this thread) hasn't gotten far enough to enqueue the
// requests in the priority queue.
while (queue.Size() < 3) {
absl::SleepFor(absl::Milliseconds(100));
}
ASSERT_OK(queue.ProcessQueue(3));
EXPECT_OK(high_status.AwaitStatus());
EXPECT_OK(med_status.AwaitStatus());
EXPECT_OK(low_status.AwaitStatus());
EXPECT_THAT(update_low.code(), 200);
EXPECT_THAT(update_med.code(), 200);
EXPECT_THAT(update_high.code(), 200);
}
TEST(ProxyVoyagerImplTest, UnauthorizedResourceAccessDenied) {
voyager::DeferrablePriorityQueue queue(1);
ASSERT_OK_AND_ASSIGN(
TestServer test_server,
CreateTestServer("test_endpoint", &queue,
ecclesia::ParseTextAsProtoOrDie<
milotic_grpc_proxy::AuthorizationPolicy>(R"pb(
mappings: {
name: "mapping1"
allow: {
key: "test_rule"
value: { permission_id: "test-permission" }
}
resource_path: "/some/protected/resource"
}
)pb")));
EXPECT_CALL(test_server.plugin, PreprocessRequest).Times(0);
auto request =
ecclesia::ParseTextAsProtoOrDie<third_party_voyager::Request>(R"pb(
req_id: "test_req"
req_fqp: { fqp: { specifier: "/some/protected/resource" } }
)pb");
grpc::ClientContext context;
third_party_voyager::Update response;
milotic::DeferredStatus status;
test_server.stub->async()->Get(&context, &request, &response,
status.Setter());
EXPECT_THAT(status.AwaitStatus(),
StatusIs(absl::StatusCode::kPermissionDenied));
}
} // namespace