| #ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_FAKE_H_ |
| #define THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_FAKE_H_ |
| |
| #include <atomic> |
| #include <chrono> // NOLINT |
| #include <cstdint> |
| #include <thread> // NOLINT |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/base/thread_annotations.h" |
| #include "absl/functional/any_invocable.h" |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/synchronization/mutex.h" |
| #include "tlbmc/subscription/manager.h" |
| #include "payload.pb.h" |
| #include "subscription_params.pb.h" |
| |
| namespace milotic_hft { |
| |
| // A fake implementation of the subscription manager. |
| // This class is thread-compatible. |
| // Call `SetFakeData` before calling `AddSubscription`. |
| class SubscriptionManagerFake : public SubscriptionManager { |
| public: |
| void SetFakeData(Payload&& payload) { payload_ = std::move(payload); } |
| |
| // The background thread will create fake data according to the subscription |
| // params and call the callback. |
| absl::StatusOr<SubscriptionId> AddSubscription( |
| const SubscriptionParams& params, |
| absl::AnyInvocable<void(Payload&&)> on_data_callback) override { |
| absl::MutexLock lock(&mutex_); |
| background_threads_.push_back( |
| std::thread([params, on_data_callback = std::move(on_data_callback), |
| this]() mutable { |
| for (int32_t i = 0; i < params.num_batches(); ++i) { |
| // Sleep just for 1ms instead of `params.sampling_interval_ms()` to |
| // speed up the test. |
| std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| Payload payload = payload_; |
| on_data_callback(std::move(payload)); |
| } |
| })); |
| return absl::StrCat("sub_", num_subscriptions_++); |
| } |
| |
| absl::Status Unsubscribe(const SubscriptionId&) override { |
| return absl::OkStatus(); |
| } |
| |
| ~SubscriptionManagerFake() override { |
| absl::MutexLock lock(&mutex_); |
| for (std::thread& thread : background_threads_) { |
| if (thread.joinable()) { |
| thread.join(); |
| } |
| } |
| } |
| |
| private: |
| Payload payload_; |
| absl::Mutex mutex_; |
| std::atomic<uint32_t> num_subscriptions_ = 0; |
| std::vector<std::thread> ABSL_GUARDED_BY(mutex_) background_threads_; |
| }; |
| |
| } // namespace milotic_hft |
| #endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_HFT_SUBSCRIPTION_MANAGER_FAKE_H_ |