diff --git a/storage/rocksdb/rocksdb/test_util/sync_point_impl.cc b/storage/rocksdb/rocksdb/test_util/sync_point_impl.cc new file mode 100644 index 0000000000000000000000000000000000000000..e1877e39860df1860f4b4bc643349bd60eebe32c --- /dev/null +++ b/storage/rocksdb/rocksdb/test_util/sync_point_impl.cc @@ -0,0 +1,129 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/sync_point_impl.h" + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { + +void TestKillRandom(std::string kill_point, int odds, + const std::string& srcfile, int srcline) { + for (auto& p : rocksdb_kill_exclude_prefixes) { + if (kill_point.substr(0, p.length()) == p) { + return; + } + } + + assert(odds > 0); + if (odds % 7 == 0) { + // class Random uses multiplier 16807, which is 7^5. If odds are + // multiplier of 7, there might be limited values generated. + odds++; + } + auto* r = Random::GetTLSInstance(); + bool crash = r->OneIn(odds); + if (crash) { + port::Crash(srcfile, srcline); + } +} + + +void SyncPoint::Data::LoadDependency(const std::vector& dependencies) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + cv_.notify_all(); +} + +void SyncPoint::Data::LoadDependencyAndMarkers( + const std::vector& dependencies, + const std::vector& markers) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + markers_.clear(); + marked_thread_id_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + for (const auto& marker : markers) { + successors_[marker.predecessor].push_back(marker.successor); + predecessors_[marker.successor].push_back(marker.predecessor); + markers_[marker.predecessor].push_back(marker.successor); + } + cv_.notify_all(); +} + +bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::Data::ClearCallBack(const std::string& point) { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.erase(point); +} + +void SyncPoint::Data::ClearAllCallBacks() { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.clear(); +} + +void SyncPoint::Data::Process(const std::string& point, void* cb_arg) { + if (!enabled_) { + return; + } + + std::unique_lock lock(mutex_); + auto thread_id = std::this_thread::get_id(); + + auto marker_iter = markers_.find(point); + if (marker_iter != markers_.end()) { + for (auto& marked_point : marker_iter->second) { + marked_thread_id_.emplace(marked_point, thread_id); + } + } + + if (DisabledByMarker(point, thread_id)) { + return; + } + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + if (DisabledByMarker(point, thread_id)) { + return; + } + } + + auto callback_pair = callbacks_.find(point); + if (callback_pair != callbacks_.end()) { + num_callbacks_running_++; + mutex_.unlock(); + callback_pair->second(cb_arg); + mutex_.lock(); + num_callbacks_running_--; + } + cleared_points_.insert(point); + cv_.notify_all(); +} +} // namespace ROCKSDB_NAMESPACE +#endif diff --git a/storage/rocksdb/rocksdb/test_util/sync_point_impl.h b/storage/rocksdb/rocksdb/test_util/sync_point_impl.h new file mode 100644 index 0000000000000000000000000000000000000000..b246c0198576f628acfd21fb2d7ec21a2e608c5b --- /dev/null +++ b/storage/rocksdb/rocksdb/test_util/sync_point_impl.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/sync_point.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "port/port.h" +#include "util/random.h" + +#pragma once + +#ifndef NDEBUG +namespace ROCKSDB_NAMESPACE { +struct SyncPoint::Data { + Data() : enabled_(false) {} + // Enable proper deletion by subclasses + virtual ~Data() {} + // successor/predecessor map loaded from LoadDependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + std::unordered_map > callbacks_; + std::unordered_map > markers_; + std::unordered_map marked_thread_id_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + std::atomic enabled_; + int num_callbacks_running_ = 0; + + void LoadDependency(const std::vector& dependencies); + void LoadDependencyAndMarkers(const std::vector& dependencies, + const std::vector& markers); + bool PredecessorsAllCleared(const std::string& point); + void SetCallBack(const std::string& point, + const std::function& callback) { + std::lock_guard lock(mutex_); + callbacks_[point] = callback; +} + + void ClearCallBack(const std::string& point); + void ClearAllCallBacks(); + void EnableProcessing() { + enabled_ = true; + } + void DisableProcessing() { + enabled_ = false; + } + void ClearTrace() { + std::lock_guard lock(mutex_); + cleared_points_.clear(); + } + bool DisabledByMarker(const std::string& point, + std::thread::id thread_id) { + auto marked_point_iter = marked_thread_id_.find(point); + return marked_point_iter != marked_thread_id_.end() && + thread_id != marked_point_iter->second; + } + void Process(const std::string& point, void* cb_arg); +}; +} // namespace ROCKSDB_NAMESPACE +#endif // NDEBUG