blob: ec4724c2fd7fb4f33dde8141dac2839a5fa57e33 [file] [log] [blame] [edit]
#include "utils/clock.h"
#include <stddef.h>
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "absl/base/nullability.h"
#include "absl/base/thread_annotations.h"
#include "absl/log/log.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
namespace util {
namespace {
// -----------------------------------------------------------------
// RealTimeClock
//
// This class is thread-safe.
class RealTimeClock final : public Clock {
public:
~RealTimeClock() override {
LOG(FATAL) << "RealTimeClock should never be destroyed";
}
absl::Time TimeNow() override { return absl::Now(); }
void Sleep(absl::Duration d) override { absl::SleepFor(d); }
void SleepUntil(absl::Time wakeup_time) override {
absl::Duration d = wakeup_time - TimeNow();
if (d > absl::ZeroDuration()) {
Sleep(d);
}
}
bool AwaitWithDeadline(absl::Mutex* mu, const absl::Condition& cond,
absl::Time deadline) override {
return mu->AwaitWithDeadline(cond, deadline);
}
};
} // namespace
Clock::~Clock() = default;
Clock* Clock::RealClock() {
static RealTimeClock* rtclock = new RealTimeClock;
return rtclock;
}
// -----------------------------------------------------------------
// SimulatedClock
//
// There are a few tricky details in the implementation of SimulatedClock.
//
// The external mutex that is passed as a param of AwaitWithDeadline() is not
// under the control of SimulatedClock; in particular, it can be destroyed any
// time after AwaitWithDeadline() returns. This requires the wakeup call from
// AdvanceTime() to avoid grabbing the external mutex if AwaitWithDeadline()
// has returned. This is accomplished by allowing the waiter to be cancelled
// via a bool guarded by a mutex in WakeUpInfo.
//
// Once AwaitWithDeadline() has released lock_, someone nefarious might call
// the SimulatedClock destructor, so it isn't possible for AwaitWithDeadline()
// to remove the wakeup call from waiters_ if the condition it is awaiting
// becomes true; it cancels the waiter instead. This means that,
// theoretically, many obsolete entries could pile up in waiters_ if
// AwaitWithDeadline() keeps being called but simulated time is not advanced.
// This seems unlikely to happen in practice.
//
// The WakeUpInfo in waiters_ are always awoken via WakeUp() (and
// removed) or cancelled before AwaitWithDeadline() returns. If a
// waiters_ value is cancelled then calling its WakeUp() method will
// short-circuit before touching the external mutex.
class SimulatedClock::WakeUpInfo {
public:
WakeUpInfo(absl::Mutex* mu, absl::Condition cond)
: mu_(mu),
cond_(cond),
wakeup_time_passed_(false),
cancelled_(false),
wakeup_called_(false) {}
void WakeUp() {
// If we are cancelled then AwaitWithDeadline may have returned, in which
// case we can't lock mu_.
{
absl::MutexLock lock(cancellation_mu_);
if (cancelled_) return;
wakeup_called_ = true;
}
absl::MutexLock lock(*mu_);
wakeup_time_passed_ = true;
}
void AwaitConditionOrWakeUp() {
mu_->Await(absl::Condition(this, &WakeUpInfo::Ready));
}
void CancelOrAwaitWakeUp() {
bool wakeup_called;
{
absl::MutexLock lock(cancellation_mu_);
cancelled_ = true;
wakeup_called = wakeup_called_;
}
if (wakeup_called && !wakeup_time_passed_) {
// Wait for WakeUp to complete.
//
// Note that this will unlock 'mu_'; this is actually necessary
// so that WakeUp() can unblock and complete. This does allow
// for 'cond_' to potentially change from true to false; that is
// OK, since WakeUp() is being called, so the deadline must be
// past, and so this method is fulfilling its duties. (Well, the
// destructor might be calling WakeUp(), but if you're deleting
// time itself while waiting for a deadline to pass, you deserve
// what you get.)
mu_->Await(absl::Condition(&wakeup_time_passed_));
}
}
private:
bool Ready() const { return wakeup_time_passed_ || cond_.Eval(); }
absl::Mutex* mu_;
absl::Condition cond_;
bool wakeup_time_passed_;
absl::Mutex cancellation_mu_;
bool cancelled_ ABSL_GUARDED_BY(cancellation_mu_);
bool wakeup_called_ ABSL_GUARDED_BY(cancellation_mu_);
};
SimulatedClock::SimulatedClock(absl::Time t)
: now_(t),
waiters_(new WaiterList),
num_await_calls_(0),
last_num_await_calls_(0) {}
SimulatedClock::~SimulatedClock() {
// Wake up all existing waiters.
WaiterList* waiters;
{
absl::MutexLock l(lock_);
waiters = waiters_;
waiters_ = nullptr; // Future Sleep() calls will crash
}
for (WaiterList::iterator iter = waiters->begin(); iter != waiters->end();
++iter) {
iter->second->WakeUp();
}
delete waiters;
}
absl::Time SimulatedClock::TimeNow() {
absl::ReaderMutexLock l(lock_);
return now_;
}
void SimulatedClock::Sleep(absl::Duration d) { SleepUntil(TimeNow() + d); }
int64_t SimulatedClock::SetTime(absl::Time t) ABSL_NO_THREAD_SAFETY_ANALYSIS {
return UpdateTime([this, t]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { now_ = t; });
}
int64_t SimulatedClock::AdvanceTime(absl::Duration d)
ABSL_NO_THREAD_SAFETY_ANALYSIS {
return UpdateTime([this, d]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { now_ += d; });
}
template <class T>
int64_t SimulatedClock::UpdateTime(const T& now_updater) {
// Deadlock could occur if UpdateTime() were to hold lock_ while waking up
// waiters, since waking up requires acquiring the external mutex, and
// AwaitWithDeadline() acquires the mutexes in the opposite order. So let's
// first grab all the wakeup callbacks, then release lock_, then call the
// callbacks.
std::vector<WaiterList::mapped_type> wakeup_calls;
lock_.lock();
now_updater(); // reset now_
WaiterList::iterator iter;
while (((iter = waiters_->begin()) != waiters_->end()) &&
(iter->first <= now_)) {
wakeup_calls.push_back(std::move(iter->second));
waiters_->erase(iter);
}
lock_.unlock();
for (const auto& wakeup_call : wakeup_calls) {
wakeup_call->WakeUp();
}
return wakeup_calls.size();
}
void SimulatedClock::SleepUntil(absl::Time wakeup_time) {
absl::Mutex mu;
absl::MutexLock lock(mu);
bool f = false;
AwaitWithDeadline(&mu, absl::Condition(&f), wakeup_time);
}
bool SimulatedClock::AwaitWithDeadline(absl::Mutex* mu,
const absl::Condition& cond,
absl::Time deadline) {
mu->AssertReaderHeld();
// Evaluate cond outside our own lock to minimize contention.
const bool ready = cond.Eval();
lock_.lock();
num_await_calls_++;
// Return now if the deadline is already past, or if the condition is true.
// This avoids creating a WakeUpInfo that won't be deleted until an
// appropriate UpdateTime() call.
if (deadline <= now_ || ready) {
lock_.unlock();
return ready;
}
auto wakeup_info = std::make_shared<WakeUpInfo>(mu, cond);
waiters_->insert(std::make_pair(deadline, wakeup_info));
lock_.unlock();
// SimulatedClock may be destroyed any time after this, so we can't
// acquire lock_ again.
// Wait until either cond.Eval() becomes true, or the deadline has passed.
wakeup_info->AwaitConditionOrWakeUp();
// Cancel the wakeup call, or if it's already in progress, wait for it to
// finish, since we must ensure no one touches 'mu' or 'cond' after we return.
wakeup_info->CancelOrAwaitWakeUp();
return cond.Eval();
}
void SimulatedClock::WaitUntilThreadsAsleep(int num_calls) {
absl::MutexLock l(lock_);
int64_t exp_await_calls = last_num_await_calls_ + num_calls;
auto cond = [this, exp_await_calls]() ABSL_SHARED_LOCKS_REQUIRED(lock_) {
// We should never be awaiting fewer events than have already
// occurred, otherwise this condition can never be satisfied.
LOG_IF(DFATAL, exp_await_calls < num_await_calls_)
<< "Equality condition can never be satisfied:" << " exp_await_calls="
<< exp_await_calls << " num_await_calls_=" << num_await_calls_;
return num_await_calls_ == exp_await_calls;
};
lock_.Await(absl::Condition(&cond));
last_num_await_calls_ = num_await_calls_;
}
void SimulatedClock::CatchUpThreadsAsleep() {
absl::MutexLock l(lock_);
last_num_await_calls_ = num_await_calls_;
}
std::optional<absl::Time> SimulatedClock::GetEarliestWakeupTime() const {
absl::ReaderMutexLock l(lock_);
if (waiters_->empty()) {
return std::nullopt;
}
return waiters_->begin()->first;
}
} // namespace util