blob: a56619ef806fc9e23c31fc8e8b23d89afe2ef8fe [file] [log] [blame]
#include "NVMeCache.hpp"
#include <valgrind/valgrind.h>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
const int valgrindTimeScaler = []() {
return (RUNNING_ON_VALGRIND != 0U) ? 10 : 1;
}();
class TestNVMeCache : public testing::Test
{
public:
boost::asio::io_context io;
std::shared_ptr<Scheduler<std::chrono::steady_clock>> scheduler;
TestNVMeCache()
{
scheduler = std::make_shared<Scheduler<std::chrono::steady_clock>>(io);
}
};
class MetricMock : public MetricBase<std::chrono::steady_clock>
{
public:
template <class Duration>
MetricMock(std::shared_ptr<Scheduler<std::chrono::steady_clock>> scheduler,
Duration interval) :
MetricBase<std::chrono::steady_clock>(
std::move(scheduler),
std::chrono::duration_cast<std::chrono::steady_clock::duration>(
interval))
{}
~MetricMock() override
{
std::cerr << "MetricMock destroy\n";
}
MOCK_METHOD(
void, readDevice,
(std::function<void(std::error_code ec, size_t size, bool complete)> &&
cb),
(noexcept, override));
MOCK_METHOD((std::string_view), getIdentifier, (),
(const, noexcept, override));
MOCK_METHOD(bool, isCacheValid, (), (const, noexcept, override));
MOCK_METHOD((std::tuple<std::chrono::time_point<ClockType>,
std::chrono::time_point<ClockType>,
std::span<const uint8_t>>),
getCache, (), (const, noexcept, override));
};
/**
* Simulate the scheduling sequence for
* * NVMe subsystem intiation
* * Metric Instance creation (after subsystem init)
* * Refresh/PartialRefresh
* The scheduling process should comply to the flowchart:
* go/nvme-mi-cache-time-scheduling#heading=h.iolas4v1li7i
*/
TEST_F(TestNVMeCache, SechuduleSequence)
{
// blocks for partial refresh
static constexpr unsigned refreshStep = 4;
boost::asio::spawn(io, [this](boost::asio::yield_context yield) {
/* Test: metric creation within system init procedure */
// metric 1 with 200 ms refresh rate
auto mtx = std::make_shared<MetricMock>(this->scheduler,
std::chrono::milliseconds(200) *
valgrindTimeScaler);
ON_CALL(*mtx, readDevice)
.WillByDefault(
[this](std::function<void(std::error_code ec, size_t size,
bool complete)>&& cb) {
boost::asio::spawn(io,
[this, cb](boost::asio::yield_context yield) {
static unsigned i = 0;
static std::chrono::steady_clock::time_point prev{
std::chrono::steady_clock::time_point::min()};
std::chrono::steady_clock::time_point current =
std::chrono::steady_clock::now();
if (prev != std::chrono::steady_clock::time_point::min())
{
// the refresh rate should be faster than the deginated
// internal + device reading delay + some buffer.
EXPECT_LE(current - prev,
std::chrono::duration_cast<
std::chrono::steady_clock::duration>(
std::chrono::milliseconds(
static_cast<unsigned>((200 + 10) * 1.1)) *
valgrindTimeScaler));
}
prev = current;
// similate a device delay
boost::asio::steady_timer timer(this->io);
timer.expires_after(std::chrono::milliseconds(10) *
valgrindTimeScaler);
timer.async_wait(yield);
bool complete = (++i % refreshStep) == 0U;
cb({}, 409, complete);
});
});
// the cache data fetch should not be happening until subsystem
// initiation complete
EXPECT_CALL(*mtx, readDevice).Times(0);
// force refresh for metric initialization
mtx->refresh([mtx](std::error_code ec) {
if (ec)
{
std::cerr << "refresh cb1: error returned\n";
return;
}
// the backgound refresh should continue
EXPECT_CALL(*mtx, readDevice).Times(::testing::AtLeast(1));
});
// simulate a subsystem initiate delay
boost::asio::spawn(io, [this](boost::asio::yield_context yield) {
boost::asio::steady_timer timer(this->io);
timer.expires_after(std::chrono::milliseconds(200) *
valgrindTimeScaler);
timer.async_wait(yield);
});
/* subsystem initiation done */
// should fetch data for 4 times for metric 1
EXPECT_CALL(*mtx, readDevice).Times(refreshStep);
this->scheduler->start();
this->scheduler->dequeue();
/* Test: metric created after system initiation */
boost::asio::steady_timer timer(this->io);
timer.expires_after(std::chrono::milliseconds(100) *
valgrindTimeScaler);
timer.async_wait(yield);
auto mtx2 = std::make_shared<MetricMock>(
this->scheduler,
std::chrono::milliseconds(300) * valgrindTimeScaler);
ON_CALL(*mtx2, readDevice)
.WillByDefault(
[this](std::function<void(std::error_code ec, size_t size,
bool complete)>&& cb) {
boost::asio::spawn(io,
[this, cb](boost::asio::yield_context yield) {
static unsigned i = 0;
// similate a device delay
boost::asio::steady_timer timer(this->io);
timer.expires_after(std::chrono::milliseconds(10) *
valgrindTimeScaler);
timer.async_wait(yield);
bool complete = (++i % refreshStep) == 0U;
cb({}, 409, complete);
});
});
// the force refresh should start immediately
EXPECT_CALL(*mtx2, readDevice).Times(refreshStep);
// force refresh for metric initialization
mtx2->refresh([mtx2](std::error_code ec) {
if (ec)
{
std::cerr << "refresh cb2: error returned\n";
return;
}
// the backgound refresh should continue
EXPECT_CALL(*mtx2, readDevice).Times(::testing::AtLeast(1));
});
/* Test: refresh and partial refresh */
timer.expires_after(std::chrono::seconds(1) * valgrindTimeScaler);
timer.async_wait(yield);
mtx2->refresh([mtx2](std::error_code ec) {
if (ec)
{
std::cerr << "refresh cb3: error returned\n";
return;
}
// the backgound refresh should continue
EXPECT_CALL(*mtx2, readDevice).Times(::testing::AtLeast(1));
});
mtx2->partialRefresh([mtx2](std::error_code ec) {
if (ec)
{
std::cerr << "partial refresh cb1: error returned\n";
return;
}
// refresh should continue until completion
EXPECT_CALL(*mtx2, readDevice)
.Times(::testing::AtMost(refreshStep - 1));
});
// read device only once for partial refresh
EXPECT_CALL(*mtx2, readDevice).Times(1);
/* Test: Metric deletion */
timer.expires_after(std::chrono::seconds(1) * valgrindTimeScaler);
timer.async_wait(yield);
bool exitWithCancel = false;
mtx2->refresh([&exitWithCancel](std::error_code ec) {
if (ec)
{
exitWithCancel = true;
return;
}
});
// slightly wait to push the task into queue
timer.expires_after(std::chrono::milliseconds(1) * valgrindTimeScaler);
timer.async_wait(yield);
// destroy the metric
mtx2.reset();
// slightly wait until the async destroy complete
timer.expires_after(std::chrono::milliseconds(1) * valgrindTimeScaler);
timer.async_wait(yield);
//
EXPECT_EQ(exitWithCancel, true);
// keep the subsystem running
timer.expires_after(std::chrono::seconds(1) * valgrindTimeScaler);
timer.async_wait(yield);
});
io.run();
}
/**
* Simulate a cucurrency of caller for the cache refresh task.
* Make sure the callback are issued in the expected behavior as the example:
*
* go/nvme-mi-cache-time-scheduling#heading=h.xnlsabh6sfwj
*/
TEST_F(TestNVMeCache, ConcurrentInvoker)
{
boost::asio::spawn(io, [this](boost::asio::yield_context yield) {
/* create the schedule and metric*/
// metric with infinity refresh rate to make sure no background refresh
auto mtx = std::make_shared<MetricMock>(
this->scheduler, std::chrono::steady_clock::duration::max());
// metric parameters
static constexpr unsigned refreshStep = 6;
static const auto deviceDelay = std::chrono::milliseconds(10) *
valgrindTimeScaler;
static unsigned currentStep = 0;
static std::chrono::steady_clock::time_point prevTime{
std::chrono::steady_clock::time_point::min()};
// default readDevice should return a very large data size to block the
// background schduling for next partialRefresh
ON_CALL(*mtx, readDevice)
.WillByDefault(
[this](std::function<void(std::error_code ec, size_t size,
bool complete)>&& cb) {
boost::asio::spawn(io,
[this, cb](boost::asio::yield_context yield) {
std::chrono::steady_clock::time_point currentTime =
std::chrono::steady_clock::now();
prevTime = currentTime;
std::cerr << "step: " << currentStep << ", current time: "
<< currentTime.time_since_epoch().count() << '\n';
// similate a device delay
boost::asio::steady_timer timer(this->io);
timer.expires_after(deviceDelay);
timer.async_wait(yield);
std::cerr << "step: " << currentStep << '\n';
std::cerr << "previous time: "
<< currentTime.time_since_epoch().count() << '\n';
bool complete = (++currentStep % refreshStep) == 0U;
if (currentStep == refreshStep)
{
currentStep = 0;
}
std::cerr << "complete: " << complete << '\n';
cb({}, 4096000000, complete);
});
});
// the cache data fetch should not be happening until subsystem
// initiation complete
EXPECT_CALL(*mtx, readDevice).Times(0);
this->scheduler->start();
this->scheduler->dequeue();
// caller A
std::cerr << "caller A\n";
EXPECT_CALL(*mtx, readDevice).Times(1);
mtx->partialRefresh([mtx](std::error_code ec) {
EXPECT_FALSE(ec);
std::cerr << "notify A\n";
// the backgound refresh should continue
EXPECT_CALL(*mtx, readDevice).Times(0);
});
boost::asio::steady_timer timer(this->io);
timer.expires_after(deviceDelay * 2);
timer.async_wait(yield);
// move forward to step 1
EXPECT_EQ(currentStep, 1);
/* caller B to caller G */
// check by currentStep in the future.
EXPECT_CALL(*mtx, readDevice).Times(::testing::AtLeast(1));
// caller B
std::cerr << "caller B\n";
mtx->refresh([mtx](std::error_code ec) {
EXPECT_FALSE(ec);
std::cerr << "notify B\n";
EXPECT_EQ(currentStep, 0);
// The schuduler should stop until next caller (G)
EXPECT_CALL(*mtx, readDevice).Times(0);
});
// move forward to step 2
timer.expires_after(deviceDelay + deviceDelay / 10);
timer.async_wait(yield);
EXPECT_EQ(currentStep, 2);
// caller C
std::cerr << "caller C\n";
mtx->refresh([mtx](std::error_code ec) {
EXPECT_FALSE(ec);
std::cerr << "notify C\n";
EXPECT_EQ(currentStep, 0);
// The schuduler should stop until next caller (G)
EXPECT_CALL(*mtx, readDevice).Times(0);
});
// metric should continue to step 3
timer.expires_after(deviceDelay);
timer.async_wait(yield);
EXPECT_EQ(currentStep, 3);
// caller D
std::cerr << "caller D\n";
mtx->partialRefresh([mtx](std::error_code ec) {
EXPECT_FALSE(ec);
std::cerr << "notify D\n";
EXPECT_EQ(currentStep, 4);
});
// caller E
std::cerr << "caller E\n";
mtx->partialRefresh([mtx](std::error_code ec) {
EXPECT_FALSE(ec);
std::cerr << "notify E\n";
EXPECT_EQ(currentStep, 4);
});
// move forward to next cycle (step 0)
timer.expires_after(deviceDelay * 10);
timer.async_wait(yield);
EXPECT_EQ(currentStep, 0);
// caller G
std::cerr << "caller G\n";
EXPECT_CALL(*mtx, readDevice).Times(1);
mtx->partialRefresh([mtx](std::error_code ec) {
EXPECT_FALSE(ec);
std::cerr << "notify G\n";
// no more data fetch until next caller
EXPECT_CALL(*mtx, readDevice).Times(0);
});
// move forward to setp 1
timer.expires_after(deviceDelay * 2);
timer.async_wait(yield);
EXPECT_EQ(currentStep, 1);
// caller H
std::cerr << "caller H\n";
EXPECT_CALL(*mtx, readDevice).Times(::testing::AnyNumber());
mtx->partialRefresh([mtx](std::error_code ec) {
EXPECT_FALSE(ec);
std::cerr << "notify H\n";
EXPECT_EQ(currentStep, 2);
});
// move forward slightly
timer.expires_after(deviceDelay / 4);
timer.async_wait(yield);
// caller I
std::cerr << "caller I\n";
bool completeI = false;
mtx->refresh([&completeI](std::error_code ec) {
// should be cancelled
EXPECT_TRUE(ec);
std::cerr << "notify I\n";
// either step 3 or 4 depends on if the async readDevice is
// compeleted or not
EXPECT_GE(currentStep, 3);
EXPECT_LE(currentStep, 4);
completeI = true;
});
// move forward to step 3
timer.expires_after(deviceDelay * 2);
timer.async_wait(yield);
// caller J
std::cerr << "caller J\n";
bool completeJ = false;
mtx->partialRefresh([&completeJ](std::error_code ec) {
// should be cancelled
EXPECT_TRUE(ec);
std::cerr << "notify J\n";
// either step 3 or 4 depends on if the async readDevice is
// compeleted or not
EXPECT_GE(currentStep, 3);
EXPECT_LE(currentStep, 4);
completeJ = true;
});
// matrix Closure
mtx.reset();
// the notify I/J should be executed immediately after the Closure
timer.expires_after(std::chrono::microseconds(1));
timer.async_wait(yield);
EXPECT_TRUE(completeI);
EXPECT_TRUE(completeJ);
timer.expires_after(deviceDelay * 10);
timer.async_wait(yield);
this->scheduler.reset();
});
io.run();
}
int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}