blob: 332af16a9c936f35f73cc146e8dbab266f8771b6 [file] [log] [blame]
#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_FAKE_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_FAKE_H_
#include <atomic>
#include <chrono> // NOLINT
#include <cstdint>
#include <thread> // NOLINT
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "tlbmc/subscription/manager.h"
#include "payload.pb.h"
#include "subscription_params.pb.h"
namespace milotic_hft {
// A fake implementation of the subscription manager.
// This class is thread-compatible.
// Call `SetFakeData` before calling `AddSubscription`.
class SubscriptionManagerFake : public SubscriptionManager {
public:
void SetFakeData(Payload&& payload) { payload_ = std::move(payload); }
// The background thread will create fake data according to the subscription
// params and call the callback.
absl::StatusOr<SubscriptionId> AddSubscription(
const SubscriptionParams& params,
absl::AnyInvocable<void(Payload&&)> on_data_callback) override {
absl::MutexLock lock(&mutex_);
background_threads_.push_back(
std::thread([params, on_data_callback = std::move(on_data_callback),
this]() mutable {
for (int32_t i = 0; i < params.num_batches(); ++i) {
// Sleep just for 1ms instead of `params.sampling_interval_ms()` to
// speed up the test.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
Payload payload = payload_;
on_data_callback(std::move(payload));
}
}));
return absl::StrCat("sub_", num_subscriptions_++);
}
absl::Status Unsubscribe(const SubscriptionId&) override {
return absl::OkStatus();
}
~SubscriptionManagerFake() override {
absl::MutexLock lock(&mutex_);
for (std::thread& thread : background_threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
private:
Payload payload_;
absl::Mutex mutex_;
std::atomic<uint32_t> num_subscriptions_ = 0;
std::vector<std::thread> ABSL_GUARDED_BY(mutex_) background_threads_;
};
} // namespace milotic_hft
#endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_FAKE_H_