blob: eed575609bb2307ae92ebd582c224f5b9682c37c [file] [log] [blame] [edit]
#ifndef THIRD_PARTY_MILOTIC_EXTERNAL_CC_TLBMC_RCU_SIMPLE_RCU_H_
#define THIRD_PARTY_MILOTIC_EXTERNAL_CC_TLBMC_RCU_SIMPLE_RCU_H_
#include <atomic>
#include <cstddef>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
// This is a very minimalistic implementation of RCU. It is intended to be used
// for simple cases where there will only be X updates to the data and this must
// be passed in when creating the object.
// We will not release objects even if there are no active readers.
// In limited updates mode (max_updates >= 0):
// the size of the vector will be maximum number of updates + 1. No cleanup will
// be done on the vector.
// In In unlimited updates mode (max_updates == -1):
// we will limit the number of updates to 100 and wrap around the circular
// buffer when the size is exceeded.
// The MOST important thing to note about this data structure is that it allows
// for LOCKLESS GETS.
namespace milotic_tlbmc {
template <typename T>
class SimpleRcu {
public:
SimpleRcu() = default;
explicit SimpleRcu(T&& data, size_t max_updates)
: max_updates_(max_updates),
unlimited_updates_(max_updates == static_cast<size_t>(-1)) {
if (unlimited_updates_) {
data_.reserve(kCircularBufferSize);
} else {
data_.reserve(max_updates + 1);
}
data_.push_back(std::move(data));
current_index_.store(0, std::memory_order_release);
}
absl::Status Update(T&& data) {
absl::MutexLock lock(mutex_);
if (unlimited_updates_) {
if (data_.size() < kCircularBufferSize) {
data_.push_back(std::move(data));
current_index_.store(data_.size() - 1, std::memory_order_release);
} else {
size_t next_index =
(current_index_.load(std::memory_order_acquire) + 1) %
kCircularBufferSize;
data_[next_index] = std::move(data);
current_index_.store(next_index, std::memory_order_release);
}
return absl::OkStatus();
}
int next_index = current_index_.load(std::memory_order_acquire) + 1;
if (next_index > max_updates_) {
return absl::InternalError(absl::StrCat(
"We have reached the maximum number of updates: ", max_updates_));
}
data_.push_back(std::move(data));
current_index_.store(next_index, std::memory_order_release);
return absl::OkStatus();
}
const T* Get() const {
int current_index = current_index_.load(std::memory_order_acquire);
// Specifically in g3's vector implementation, operator[] obtains current
// vector size(). This can lead to a data race. We can use the iterator to
// directly access the value without checking the size since we can
// guarantee we are within the vectors size.
return &(*(data_.begin() + current_index));
}
private:
static constexpr size_t kCircularBufferSize = 100;
const size_t max_updates_ = 0;
const bool unlimited_updates_ = false;
absl::Mutex mutex_;
std::atomic<size_t> current_index_;
// We will ALWAYS reserve the maximum number of updates, so we can guarantee
// all pointers will always be valid
std::vector<T> data_;
};
} // namespace milotic_tlbmc
#endif // THIRD_PARTY_MILOTIC_EXTERNAL_CC_TLBMC_RCU_SIMPLE_RCU_H_