From 91d274b78314b2b56bdbbf5c10ebbba43d3d315b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E7=85=9C?= <9930261+zhuyu901115@user.noreply.gitee.com> Date: Fri, 13 Jun 2025 09:17:19 +0800 Subject: [PATCH] =?UTF-8?q?trace=5Freplay=E6=96=B0=E5=A2=9Eio=5Ftracer?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocksdb/rocksdb/trace_replay/io_tracer.cc | 228 ++++++++++++++++++ .../rocksdb/rocksdb/trace_replay/io_tracer.h | 174 +++++++++++++ .../rocksdb/trace_replay/io_tracer_test.cc | 215 +++++++++++++++++ 3 files changed, 617 insertions(+) create mode 100644 storage/rocksdb/rocksdb/trace_replay/io_tracer.cc create mode 100644 storage/rocksdb/rocksdb/trace_replay/io_tracer.h create mode 100644 storage/rocksdb/rocksdb/trace_replay/io_tracer_test.cc diff --git a/storage/rocksdb/rocksdb/trace_replay/io_tracer.cc b/storage/rocksdb/rocksdb/trace_replay/io_tracer.cc new file mode 100644 index 000000000..b1ae8f222 --- /dev/null +++ b/storage/rocksdb/rocksdb/trace_replay/io_tracer.cc @@ -0,0 +1,228 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/io_tracer.h" + +#include +#include +#include + +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { +IOTraceWriter::IOTraceWriter(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) + : env_(env), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)) {} + +Status IOTraceWriter::WriteIOOp(const IOTraceRecord& record) { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + if (trace_file_size > trace_options_.max_trace_file_size) { + return Status::OK(); + } + Trace trace; + trace.ts = record.access_timestamp; + trace.type = record.trace_type; + Slice file_operation(record.file_operation); + PutLengthPrefixedSlice(&trace.payload, file_operation); + PutFixed64(&trace.payload, record.latency); + Slice io_status(record.io_status); + PutLengthPrefixedSlice(&trace.payload, io_status); + /* Write remaining options based on trace_type set by file operation */ + switch (record.trace_type) { + case TraceType::kIOGeneral: + break; + case TraceType::kIOFileNameAndFileSize: + PutFixed64(&trace.payload, record.file_size); + FALLTHROUGH_INTENDED; + case TraceType::kIOFileName: { + Slice file_name(record.file_name); + PutLengthPrefixedSlice(&trace.payload, file_name); + break; + } + case TraceType::kIOLenAndOffset: + PutFixed64(&trace.payload, record.offset); + FALLTHROUGH_INTENDED; + case TraceType::kIOLen: + PutFixed64(&trace.payload, record.len); + break; + default: + assert(false); + } + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +Status IOTraceWriter::WriteHeader() { + Trace trace; + trace.ts = env_->NowMicros(); + trace.type = TraceType::kTraceBegin; + PutLengthPrefixedSlice(&trace.payload, kTraceMagic); + PutFixed32(&trace.payload, kMajorVersion); + PutFixed32(&trace.payload, kMinorVersion); + std::string encoded_trace; + TracerHelper::EncodeTrace(trace, &encoded_trace); + return trace_writer_->Write(encoded_trace); +} + +IOTraceReader::IOTraceReader(std::unique_ptr&& reader) + : trace_reader_(std::move(reader)) {} + +Status IOTraceReader::ReadHeader(IOTraceHeader* header) { + assert(header != nullptr); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + header->start_time = trace.ts; + Slice enc_slice = Slice(trace.payload); + Slice magic_number; + if (!GetLengthPrefixedSlice(&enc_slice, &magic_number)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read the magic number."); + } + if (magic_number.ToString() != kTraceMagic) { + return Status::Corruption( + "Corrupted header in the trace file: Magic number does not match."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_major_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb major " + "version number."); + } + if (!GetFixed32(&enc_slice, &header->rocksdb_minor_version)) { + return Status::Corruption( + "Corrupted header in the trace file: Failed to read rocksdb minor " + "version number."); + } + // We should have retrieved all information in the header. + if (!enc_slice.empty()) { + return Status::Corruption( + "Corrupted header in the trace file: The length of header is too " + "long."); + } + return Status::OK(); +} + +Status IOTraceReader::ReadIOOp(IOTraceRecord* record) { + assert(record); + std::string encoded_trace; + Status s = trace_reader_->Read(&encoded_trace); + if (!s.ok()) { + return s; + } + Trace trace; + s = TracerHelper::DecodeTrace(encoded_trace, &trace); + if (!s.ok()) { + return s; + } + record->access_timestamp = trace.ts; + record->trace_type = trace.type; + Slice enc_slice = Slice(trace.payload); + + Slice file_operation; + if (!GetLengthPrefixedSlice(&enc_slice, &file_operation)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file operation."); + } + record->file_operation = file_operation.ToString(); + if (!GetFixed64(&enc_slice, &record->latency)) { + return Status::Incomplete( + "Incomplete access record: Failed to read latency."); + } + Slice io_status; + if (!GetLengthPrefixedSlice(&enc_slice, &io_status)) { + return Status::Incomplete( + "Incomplete access record: Failed to read IO status."); + } + record->io_status = io_status.ToString(); + /* Read remaining options based on trace_type set by file operation */ + switch (record->trace_type) { + case TraceType::kIOGeneral: + break; + case TraceType::kIOFileNameAndFileSize: + if (!GetFixed64(&enc_slice, &record->file_size)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file size."); + } + FALLTHROUGH_INTENDED; + case TraceType::kIOFileName: { + Slice file_name; + if (!GetLengthPrefixedSlice(&enc_slice, &file_name)) { + return Status::Incomplete( + "Incomplete access record: Failed to read file name."); + } + record->file_name = file_name.ToString(); + break; + } + case TraceType::kIOLenAndOffset: + if (!GetFixed64(&enc_slice, &record->offset)) { + return Status::Incomplete( + "Incomplete access record: Failed to read offset."); + } + FALLTHROUGH_INTENDED; + case TraceType::kIOLen: { + if (!GetFixed64(&enc_slice, &record->len)) { + return Status::Incomplete( + "Incomplete access record: Failed to read length."); + } + break; + } + default: + assert(false); + } + return Status::OK(); +} + +IOTracer::IOTracer() : tracing_enabled(false) { writer_.store(nullptr); } + +IOTracer::~IOTracer() { EndIOTrace(); } + +Status IOTracer::StartIOTrace(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (writer_.load()) { + return Status::Busy(); + } + trace_options_ = trace_options; + writer_.store(new IOTraceWriter(env, trace_options, std::move(trace_writer))); + tracing_enabled = true; + return writer_.load()->WriteHeader(); +} + +void IOTracer::EndIOTrace() { + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return; + } + delete writer_.load(); + writer_.store(nullptr); + tracing_enabled = false; +} + +Status IOTracer::WriteIOOp(const IOTraceRecord& record) { + if (!writer_.load()) { + return Status::OK(); + } + InstrumentedMutexLock lock_guard(&trace_writer_mutex_); + if (!writer_.load()) { + return Status::OK(); + } + return writer_.load()->WriteIOOp(record); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/storage/rocksdb/rocksdb/trace_replay/io_tracer.h b/storage/rocksdb/rocksdb/trace_replay/io_tracer.h new file mode 100644 index 000000000..36be4c602 --- /dev/null +++ b/storage/rocksdb/rocksdb/trace_replay/io_tracer.h @@ -0,0 +1,174 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "monitoring/instrumented_mutex.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/trace_reader_writer.h" +#include "trace_replay/trace_replay.h" + +namespace ROCKSDB_NAMESPACE { + +struct IOTraceRecord { + // Required fields for all accesses. + uint64_t access_timestamp = 0; + TraceType trace_type = TraceType::kTraceMax; + std::string file_operation; + uint64_t latency = 0; + std::string io_status; + // Required fields for read. + std::string file_name; + uint64_t len = 0; + uint64_t offset = 0; + uint64_t file_size = 0; + + IOTraceRecord() {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const std::string& _file_operation, const uint64_t& _latency, + const std::string& _io_status, const std::string& _file_name) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + file_operation(_file_operation), + latency(_latency), + io_status(_io_status), + file_name(_file_name) {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const std::string& _file_operation, const uint64_t& _latency, + const std::string& _io_status, const std::string& _file_name, + const uint64_t& _file_size) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + file_operation(_file_operation), + latency(_latency), + io_status(_io_status), + file_name(_file_name), + file_size(_file_size) {} + + IOTraceRecord(const uint64_t& _access_timestamp, const TraceType& _trace_type, + const std::string& _file_operation, const uint64_t& _latency, + const std::string& _io_status, const uint64_t& _len = 0, + const uint64_t& _offset = 0) + : access_timestamp(_access_timestamp), + trace_type(_trace_type), + file_operation(_file_operation), + latency(_latency), + io_status(_io_status), + len(_len), + offset(_offset) {} +}; + +struct IOTraceHeader { + uint64_t start_time; + uint32_t rocksdb_major_version; + uint32_t rocksdb_minor_version; +}; + +// IOTraceWriter writes IO operation as a single trace. Each trace will have a +// timestamp and type, followed by the trace payload. +class IOTraceWriter { + public: + IOTraceWriter(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer); + ~IOTraceWriter() = default; + // No copy and move. + IOTraceWriter(const IOTraceWriter&) = delete; + IOTraceWriter& operator=(const IOTraceWriter&) = delete; + IOTraceWriter(IOTraceWriter&&) = delete; + IOTraceWriter& operator=(IOTraceWriter&&) = delete; + + Status WriteIOOp(const IOTraceRecord& record); + + // Write a trace header at the beginning, typically on initiating a trace, + // with some metadata like a magic number and RocksDB version. + Status WriteHeader(); + + private: + Env* env_; + TraceOptions trace_options_; + std::unique_ptr trace_writer_; +}; + +// IOTraceReader helps read the trace file generated by IOTraceWriter. +class IOTraceReader { + public: + explicit IOTraceReader(std::unique_ptr&& reader); + ~IOTraceReader() = default; + // No copy and move. + IOTraceReader(const IOTraceReader&) = delete; + IOTraceReader& operator=(const IOTraceReader&) = delete; + IOTraceReader(IOTraceReader&&) = delete; + IOTraceReader& operator=(IOTraceReader&&) = delete; + + Status ReadHeader(IOTraceHeader* header); + + Status ReadIOOp(IOTraceRecord* record); + + private: + std::unique_ptr trace_reader_; +}; + +// An IO tracer. It uses IOTraceWriter to write the access record to the +// trace file. +class IOTracer { + public: + IOTracer(); + ~IOTracer(); + // No copy and move. + IOTracer(const IOTracer&) = delete; + IOTracer& operator=(const IOTracer&) = delete; + IOTracer(IOTracer&&) = delete; + IOTracer& operator=(IOTracer&&) = delete; + + // no_sanitize is added for tracing_enabled. writer_ is protected under mutex + // so even if user call Start/EndIOTrace and tracing_enabled is not updated in + // the meanwhile, WriteIOOp will anyways check the writer_ protected under + // mutex and ignore the operation if writer_is null. So its ok if + // tracing_enabled shows non updated value. + +#if defined(__clang__) +#if defined(__has_feature) && __has_feature(thread_sanitizer) +#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread"))) +#endif // __has_feature(thread_sanitizer) +#else // __clang__ +#ifdef __SANITIZE_THREAD__ +#define TSAN_SUPPRESSION __attribute__((no_sanitize("thread"))) +#endif // __SANITIZE_THREAD__ +#endif // __clang__ + +#ifndef TSAN_SUPPRESSION +#define TSAN_SUPPRESSION +#endif // TSAN_SUPPRESSION + + // Start writing IO operations to the trace_writer. + TSAN_SUPPRESSION Status + StartIOTrace(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer); + + // Stop writing IO operations to the trace_writer. + TSAN_SUPPRESSION void EndIOTrace(); + + TSAN_SUPPRESSION bool is_tracing_enabled() const { return tracing_enabled; } + + Status WriteIOOp(const IOTraceRecord& record); + + private: + TraceOptions trace_options_; + // A mutex protects the writer_. + InstrumentedMutex trace_writer_mutex_; + std::atomic writer_; + // bool tracing_enabled is added to avoid costly operation of checking atomic + // variable 'writer_' is nullptr or not in is_tracing_enabled(). + // is_tracing_enabled() is invoked multiple times by FileSystem classes. + bool tracing_enabled; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/storage/rocksdb/rocksdb/trace_replay/io_tracer_test.cc b/storage/rocksdb/rocksdb/trace_replay/io_tracer_test.cc new file mode 100644 index 000000000..3e1602ea1 --- /dev/null +++ b/storage/rocksdb/rocksdb/trace_replay/io_tracer_test.cc @@ -0,0 +1,215 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "trace_replay/io_tracer.h" + +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const std::string kDummyFile = "/dummy/file"; + +} // namespace + +class IOTracerTest : public testing::Test { + public: + IOTracerTest() { + test_path_ = test::PerThreadDBPath("io_tracer_test"); + env_ = ROCKSDB_NAMESPACE::Env::Default(); + EXPECT_OK(env_->CreateDir(test_path_)); + trace_file_path_ = test_path_ + "/io_trace"; + } + + ~IOTracerTest() override { + EXPECT_OK(env_->DeleteFile(trace_file_path_)); + EXPECT_OK(env_->DeleteDir(test_path_)); + } + + std::string GetFileOperation(uint64_t id) { + id = id % 4; + switch (id) { + case 0: + return "CreateDir"; + case 1: + return "GetChildren"; + case 2: + return "FileSize"; + case 3: + return "DeleteDir"; + default: + assert(false); + } + return ""; + } + + void WriteIOOp(IOTraceWriter* writer, uint64_t nrecords) { + assert(writer); + for (uint64_t i = 0; i < nrecords; i++) { + IOTraceRecord record; + record.trace_type = TraceType::kIOLenAndOffset; + record.file_operation = GetFileOperation(i); + record.io_status = IOStatus::OK().ToString(); + record.file_name = kDummyFile + std::to_string(i); + record.len = i; + record.offset = i + 20; + ASSERT_OK(writer->WriteIOOp(record)); + } + } + + void VerifyIOOp(IOTraceReader* reader, uint32_t nrecords) { + assert(reader); + for (uint32_t i = 0; i < nrecords; i++) { + IOTraceRecord record; + ASSERT_OK(reader->ReadIOOp(&record)); + ASSERT_EQ(record.file_operation, GetFileOperation(i)); + ASSERT_EQ(record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(record.len, i); + ASSERT_EQ(record.offset, i + 20); + } + } + + Env* env_; + EnvOptions env_options_; + std::string trace_file_path_; + std::string test_path_; +}; + +TEST_F(IOTracerTest, AtomicWrite) { + std::string file_name = kDummyFile + std::to_string(0); + { + IOTraceRecord record(0, TraceType::kIOFileName, GetFileOperation(0), 0, + IOStatus::OK().ToString(), file_name); + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + ASSERT_OK(writer.StartIOTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteIOOp(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast(header.rocksdb_minor_version)); + // Read record and verify data. + IOTraceRecord access_record; + ASSERT_OK(reader.ReadIOOp(&access_record)); + ASSERT_EQ(access_record.file_operation, GetFileOperation(0)); + ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(access_record.file_name, file_name); + ASSERT_NOK(reader.ReadIOOp(&access_record)); + } +} + +TEST_F(IOTracerTest, AtomicWriteBeforeStartTrace) { + { + IOTraceRecord record(0, TraceType::kIOGeneral, GetFileOperation(0), 0, + IOStatus::OK().ToString()); + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + // The record should not be written to the trace_file since StartIOTrace is + // not called. + ASSERT_OK(writer.WriteIOOp(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains nothing. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_NOK(reader.ReadHeader(&header)); + } +} + +TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) { + { + IOTraceRecord record(0, TraceType::kIOFileNameAndFileSize, + GetFileOperation(2), 0 /*latency*/, + IOStatus::OK().ToString(), "", 10 /*file_size*/); + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTracer writer; + ASSERT_OK(writer.StartIOTrace(env_, trace_opt, std::move(trace_writer))); + ASSERT_OK(writer.WriteIOOp(record)); + writer.EndIOTrace(); + // Write the record again. This time the record should not be written since + // EndIOTrace is called. + ASSERT_OK(writer.WriteIOOp(record)); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Verify trace file contains one record. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast(header.rocksdb_minor_version)); + + IOTraceRecord access_record; + ASSERT_OK(reader.ReadIOOp(&access_record)); + ASSERT_EQ(access_record.file_operation, GetFileOperation(2)); + ASSERT_EQ(access_record.io_status, IOStatus::OK().ToString()); + ASSERT_EQ(access_record.file_size, 10); + // No more record. + ASSERT_NOK(reader.ReadIOOp(&access_record)); + } +} + +TEST_F(IOTracerTest, AtomicMultipleWrites) { + { + TraceOptions trace_opt; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + IOTraceWriter writer(env_, trace_opt, std::move(trace_writer)); + ASSERT_OK(writer.WriteHeader()); + // Write 10 records + WriteIOOp(&writer, 10); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + + { + // Verify trace file is generated correctly. + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + IOTraceReader reader(std::move(trace_reader)); + IOTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, static_cast(header.rocksdb_major_version)); + ASSERT_EQ(kMinorVersion, static_cast(header.rocksdb_minor_version)); + // Read 10 records. + VerifyIOOp(&reader, 10); + // Read one more and record and it should report error. + IOTraceRecord record; + ASSERT_NOK(reader.ReadIOOp(&record)); + } +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- Gitee