From 8492c4f900ef453d28555abffdd9bcdd35a2973a Mon Sep 17 00:00:00 2001 From: fengyang Date: Sun, 18 Dec 2022 14:46:57 +0800 Subject: [PATCH 01/10] to_hash_with_seed --- src/util/hash.rs | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/src/util/hash.rs b/src/util/hash.rs index 945d7f6..c9fce80 100644 --- a/src/util/hash.rs +++ b/src/util/hash.rs @@ -13,7 +13,7 @@ use crate::util::slice::Slice; pub trait ToHash { fn to_hash(&self) -> u32; - fn to_hash_seed(&self, seed: u32) -> u32; + fn to_hash_with_seed(&self, seed: u32) -> u32; } /// 所有基本类型 u8, i8, u16, u32 ... 的Vec都可以实现 hash 值计算 @@ -28,10 +28,10 @@ impl ToHash for Vec { v_v.to_hash() } - fn to_hash_seed(&self, seed: u32) -> u32 { + fn to_hash_with_seed(&self, seed: u32) -> u32 { let v_v = self.as_slice(); - v_v.to_hash_seed(seed) + v_v.to_hash_with_seed(seed) } } @@ -44,16 +44,10 @@ impl ToHash for Vec { impl ToHash for &[T] { #[inline] fn to_hash(&self) -> u32 { - let ptr_u8 = self.as_ptr() as *const _ as *const u8; - - let data = unsafe { - stds::from_raw_parts(ptr_u8, size_of::() * self.len()) - }; - - Hash::hash_code(data, HASH_DEFAULT_SEED) + self.to_hash_with_seed(HASH_DEFAULT_SEED) } - fn to_hash_seed(&self, seed: u32) -> u32 { + fn to_hash_with_seed(&self, seed: u32) -> u32 { let ptr_u8 = self.as_ptr() as *const _ as *const u8; let data = unsafe { @@ -71,10 +65,10 @@ impl ToHash for &[T] { /// ``` impl ToHash for &str { fn to_hash(&self) -> u32 { - Hash::hash_code(self.as_bytes(), HASH_DEFAULT_SEED) + self.to_hash_with_seed(HASH_DEFAULT_SEED) } - fn to_hash_seed(&self, seed: u32) -> u32 { + fn to_hash_with_seed(&self, seed: u32) -> u32 { Hash::hash_code(self.as_bytes(), seed) } } @@ -88,10 +82,10 @@ impl ToHash for &str { /// ``` impl ToHash for Slice { fn to_hash(&self) -> u32 { - Hash::hash_code(self.to_vec().as_slice(), HASH_DEFAULT_SEED) + self.to_hash_with_seed(HASH_DEFAULT_SEED) } - fn to_hash_seed(&self, seed: u32) -> u32 { + fn to_hash_with_seed(&self, seed: u32) -> u32 { Hash::hash_code(self.to_vec().as_slice(), seed) } } @@ -105,10 +99,10 @@ impl ToHash for Slice { /// ``` impl ToHash for String { fn to_hash(&self) -> u32 { - Hash::hash_code(self.as_bytes(), HASH_DEFAULT_SEED) + self.to_hash_with_seed(HASH_DEFAULT_SEED) } - fn to_hash_seed(&self, seed: u32) -> u32 { + fn to_hash_with_seed(&self, seed: u32) -> u32 { Hash::hash_code(self.as_bytes(), seed) } } -- Gitee From d0b2b64bff645e1de0d924e04528a896f30e68ab Mon Sep 17 00:00:00 2001 From: fengyang Date: Sun, 18 Dec 2022 16:34:13 +0800 Subject: [PATCH 02/10] bloom_filter from func --- src/util/filter_policy.rs | 20 +++++++++++++++++--- src/util/filter_policy_test.rs | 31 +++++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/util/filter_policy.rs b/src/util/filter_policy.rs index 5a87f2e..af7c915 100644 --- a/src/util/filter_policy.rs +++ b/src/util/filter_policy.rs @@ -3,16 +3,30 @@ use crate::traits::filter_policy_trait::{FilterPolicy}; use crate::util::hash::{Hash, ToHash}; use crate::util::slice::Slice; +pub trait FromPolicy { + fn from_bits_per_key(&self) -> usize; + fn from_k(&self) -> usize; +} + pub struct BloomFilterPolicy { bits_per_key: usize, k: usize } impl<'a> BloomFilterPolicy { - pub fn bloom_hash(key: Slice) -> u32 { - key.to_hash() - // Hash::hash_code(key., 0xbc9f1d34) + key.to_hash_with_seed(0xbc9f1d34) + } +} + +/// get struct BloomFilterPolicy 属性 +impl FromPolicy for BloomFilterPolicy { + fn from_bits_per_key(&self) -> usize { + self.bits_per_key + } + + fn from_k(&self) -> usize { + self.k } } diff --git a/src/util/filter_policy_test.rs b/src/util/filter_policy_test.rs index b49448f..e3247fe 100644 --- a/src/util/filter_policy_test.rs +++ b/src/util/filter_policy_test.rs @@ -1,14 +1,33 @@ use std::ptr::null; +use crate::traits::filter_policy_trait::FilterPolicy; use crate::util::bloom_filter; -use crate::util::filter_policy::BloomFilterPolicy; +use crate::util::filter_policy::{BloomFilterPolicy, FromPolicy}; +use crate::util::slice::Slice; + +#[test] +fn test_bloom_hash() { + let val = "aabbccd"; + let slice: Slice = Slice::from_buf(val.as_bytes()); + + let hash_val = BloomFilterPolicy::bloom_hash(slice); + assert_eq!(hash_val, 2085241752); +} #[test] fn test_new() { - let bloom_filter = BloomFilterPolicy::new(8); - println!("hash:{}", "a"); - // assert_eq!(bloom_filter, null()); + let bloom_filter: BloomFilterPolicy = BloomFilterPolicy::new(8); + assert_eq!(bloom_filter.from_bits_per_key(), 8); + assert_eq!(bloom_filter.from_k(), 6); let bloom_filter = BloomFilterPolicy::new(800); - println!("hash:{}", "a"); + assert_eq!(bloom_filter.from_bits_per_key(), 800); + assert_eq!(bloom_filter.from_k(), 30); + +} -} \ No newline at end of file +// #[test] +// fn test_create_filter() { +// let bloom_filter: BloomFilterPolicy = BloomFilterPolicy::create_filter(8); +// println!("{}", "aa") +// +// } \ No newline at end of file -- Gitee From 78b9d006ba6e1f9f555fd8178558b1160b64759a Mon Sep 17 00:00:00 2001 From: zhengcheng Date: Mon, 19 Dec 2022 09:35:50 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E8=BF=9B=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 9222ff0..6b9b6ff 100644 --- a/README.md +++ b/README.md @@ -51,10 +51,10 @@ LSM tree 是许多 KV型或日志型数据库所依赖的核心实现,例如Bi | CRC | wangboo、lxd5866 | | | Env | lxd5866 | | | filter_policy | fengyang | 10% | -| Hash | fengyang | 100% | -| Histgram | kazeseiriou | | +| Hash | fengyang | 100% | +| Histgram | kazeseiriou | 100% | | loging | | | -| MutexLock | kazeseiriou | | +| MutexLock | kazeseiriou | 100% | | Random | colagy | | | Status | fengyang | 100% | | Slice | wangboo | | \ No newline at end of file -- Gitee From 659696bb65396d81d3552590c8d56f1849144db1 Mon Sep 17 00:00:00 2001 From: wangboo <5417808+wangboa@user.noreply.gitee.com> Date: Wed, 21 Dec 2022 21:05:01 +0800 Subject: [PATCH 04/10] add next stop jobs, add #[inline] notes to some short functions --- README.md | 28 +++++++++++++++++++++++----- src/util/arena.rs | 1 + src/util/crc.rs | 2 ++ src/util/slice.rs | 8 ++++++++ 4 files changed, 34 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6b9b6ff..5d05fc2 100644 --- a/README.md +++ b/README.md @@ -40,21 +40,39 @@ LSM tree 是许多 KV型或日志型数据库所依赖的核心实现,例如Bi ## RoadMap 1. 1.0.0 版本, 完成 util 相关的内容 - | 功能模块 | 完成人 | 进度 | |-------------------------------|-----------------|------| -| Arena (Memory Management) | wangboo | | +| Arena (Memory Management) | wangboo | 100% | | bloom | fengyang | 10% | | Cache | colagy | | | Coding (Primitive Type SerDe) | colagy | | | Comparator | fengyang | 90% | -| CRC | wangboo、lxd5866 | | +| CRC | wangboo、lxd5866 | 100% | | Env | lxd5866 | | | filter_policy | fengyang | 10% | | Hash | fengyang | 100% | | Histgram | kazeseiriou | 100% | -| loging | | | +| loging | | | | MutexLock | kazeseiriou | 100% | | Random | colagy | | | Status | fengyang | 100% | -| Slice | wangboo | | \ No newline at end of file +| Slice | wangboo | 100% | + +2. 1.1.0 版本, 完成基础零部件 + +| 功能模块 | 完成人 | 进度 | +|------------------|-----|-----| +| skiplist | 未认领 | | +| MemTableIterator | 未认领 | | +| MemTable | 未认领 | | +| LogReader | 未认领 | | +| LogWriter | 未认领 | | +| TableCache | 未认领 | | +| FileMetaData | 未认领 | | +| VersionEdit | 未认领 | | +| VersionSet | 未认领 | | +| ReadOptions | 未认领 | | +| WriteOptions | 未认领 | | +| WriteBatch | 未认领 | | + +3. 1.2.0 版本, 完成核心组件 \ No newline at end of file diff --git a/src/util/arena.rs b/src/util/arena.rs index 611053b..3f46dcb 100644 --- a/src/util/arena.rs +++ b/src/util/arena.rs @@ -58,6 +58,7 @@ impl Arena { return self.allocate_fallback(bytes, align); } + #[inline] pub fn memory_usage(&self) -> usize { self.memory_usage } diff --git a/src/util/crc.rs b/src/util/crc.rs index c0f8b34..cf7918b 100644 --- a/src/util/crc.rs +++ b/src/util/crc.rs @@ -240,6 +240,8 @@ const K_STRIDE_EXTENSION_TABLE3: [u32; 256] = [ /// 可以被计算 crc 值的特质 /// 默认实现了 &[T], Vec[T], Slice, &str, String pub trait AsCrc { + + #[inline] fn as_crc(&self) -> u32 { self.as_crc_extend(0) } diff --git a/src/util/slice.rs b/src/util/slice.rs index c9ddf7f..7c0bbb3 100644 --- a/src/util/slice.rs +++ b/src/util/slice.rs @@ -14,6 +14,7 @@ extern { impl Default for Slice { /// 构造一个空的 Slice + #[inline] fn default() -> Self { Self { data: Vec::new() @@ -24,6 +25,7 @@ impl Default for Slice { impl Slice { /// 从 &mut [u8] 转到 Slice, 这里存在内存拷贝开销 + #[inline] pub fn from_buf(buf: &[u8]) -> Self { Self { data: buf.to_owned() @@ -95,6 +97,7 @@ impl<'a> Slice { impl From for String { /// 将 Slice 内数据的所有权移交给 String + #[inline] fn from(s: Slice) -> Self { unsafe { String::from_utf8_unchecked(s.data) @@ -103,12 +106,14 @@ impl From for String { } impl From for Vec { + #[inline] fn from(s: Slice) -> Self { s.data } } impl > From for Slice { + #[inline] fn from(r: R) -> Self { Self { data: Vec::from(r.as_ref()) @@ -118,6 +123,7 @@ impl > From for Slice { impl PartialEq for Slice { /// 判断两个 Slice 是否相同 + #[inline] fn eq(&self, other: &Self) -> bool { return self.size() == other.size() && unsafe { memcmp( @@ -158,6 +164,7 @@ impl core::ops::Index for Slice { type Output = u8; /// 获取某个下标的数据 + #[inline] fn index(&self, index: usize) -> &Self::Output { assert!(index < self.size()); &(**self)[index] @@ -168,6 +175,7 @@ impl Deref for Slice { type Target = [u8]; /// Slice 解引用到 &[u8] + #[inline] fn deref(&self) -> &Self::Target { &*self.data } -- Gitee From b13ffe421019161c3bf5f8ce7e62192d25f0e253 Mon Sep 17 00:00:00 2001 From: fengyang Date: Wed, 4 Jan 2023 10:18:43 +0800 Subject: [PATCH 05/10] add inline in some func --- src/util/hash.rs | 12 ++++++++++++ src/util/status.rs | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/src/util/hash.rs b/src/util/hash.rs index c9fce80..7d111c7 100644 --- a/src/util/hash.rs +++ b/src/util/hash.rs @@ -11,8 +11,10 @@ use crate::util::slice::Slice; /// 一种可以计算 hash 的特质 pub trait ToHash { + #[inline] fn to_hash(&self) -> u32; + #[inline] fn to_hash_with_seed(&self, seed: u32) -> u32; } @@ -22,12 +24,14 @@ pub trait ToHash { /// let hash = vec!['a','b','c'].to_hash(); /// ``` impl ToHash for Vec { + #[inline] fn to_hash(&self) -> u32 { let v_v = self.as_slice(); v_v.to_hash() } + #[inline] fn to_hash_with_seed(&self, seed: u32) -> u32 { let v_v = self.as_slice(); @@ -47,6 +51,7 @@ impl ToHash for &[T] { self.to_hash_with_seed(HASH_DEFAULT_SEED) } + #[inline] fn to_hash_with_seed(&self, seed: u32) -> u32 { let ptr_u8 = self.as_ptr() as *const _ as *const u8; @@ -64,10 +69,12 @@ impl ToHash for &[T] { /// let hash = "abc".to_hash(); /// ``` impl ToHash for &str { + #[inline] fn to_hash(&self) -> u32 { self.to_hash_with_seed(HASH_DEFAULT_SEED) } + #[inline] fn to_hash_with_seed(&self, seed: u32) -> u32 { Hash::hash_code(self.as_bytes(), seed) } @@ -81,10 +88,12 @@ impl ToHash for &str { /// let slice_hash_val = slice.to_hash(); /// ``` impl ToHash for Slice { + #[inline] fn to_hash(&self) -> u32 { self.to_hash_with_seed(HASH_DEFAULT_SEED) } + #[inline] fn to_hash_with_seed(&self, seed: u32) -> u32 { Hash::hash_code(self.to_vec().as_slice(), seed) } @@ -98,10 +107,12 @@ impl ToHash for Slice { /// let string_hash_val = val_s.to_hash(); /// ``` impl ToHash for String { + #[inline] fn to_hash(&self) -> u32 { self.to_hash_with_seed(HASH_DEFAULT_SEED) } + #[inline] fn to_hash_with_seed(&self, seed: u32) -> u32 { Hash::hash_code(self.as_bytes(), seed) } @@ -111,6 +122,7 @@ impl ToHash for String { pub struct Hash {} impl Hash { + #[inline] pub fn hash_code(data: &[u8], seed: u32) -> u32 { let murmur_hash: u32 = 0xc6a4a793; let r: u32 = 24; diff --git a/src/util/status.rs b/src/util/status.rs index 9bb6662..87a5381 100644 --- a/src/util/status.rs +++ b/src/util/status.rs @@ -13,6 +13,7 @@ pub struct Status { } impl Default for Status { + #[inline] fn default() -> Self { LevelError::ok() } @@ -306,6 +307,7 @@ impl LevelError { } impl Default for LevelError { + #[inline] fn default() -> LevelError { KOk } @@ -328,6 +330,7 @@ impl TryFrom for LevelError { /// let rs: LevelError = LevelError::try_from(3)?; /// assert!(&rs.is_not_supported_error()); /// ``` + #[inline] fn try_from(value: i32) -> Result { match value { 0 => Ok(KOk), @@ -343,6 +346,7 @@ impl TryFrom for LevelError { } impl Display for LevelError { + #[inline] fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut print = String::new(); -- Gitee From c28bac4515cba7df2c956b572cc3d5f5b1320fba Mon Sep 17 00:00:00 2001 From: wangboo <5417808+wangboa@user.noreply.gitee.com> Date: Wed, 4 Jan 2023 20:54:44 +0800 Subject: [PATCH 06/10] add log reader and writer --- 1.bin | Bin 0 -> 5750 bytes README.md | 28 +++++----- src/db/log_reader.rs | 114 ++++++++++++++++++++++++++++++++++++++++ src/db/log_wr_test.rs | 61 +++++++++++++++++++++ src/db/log_writer.rs | 98 ++++++++++++++++++++++++++++++++++ src/db/mod.rs | 3 ++ src/lib.rs | 2 + src/util/coding.rs | 1 - src/util/crc.rs | 28 +++++++++- src/util/crc_test.rs | 13 ++++- src/util/mod.rs | 8 +-- src/util/slice.rs | 14 +++++ src/util/status.rs | 38 +++++++++----- src/util/status_test.rs | 1 - 14 files changed, 373 insertions(+), 36 deletions(-) create mode 100644 1.bin create mode 100644 src/db/log_reader.rs create mode 100644 src/db/log_wr_test.rs create mode 100644 src/db/log_writer.rs diff --git a/1.bin b/1.bin new file mode 100644 index 0000000000000000000000000000000000000000..ae41623ec551be8d83267e117d57ff87f5865988 GIT binary patch literal 5750 zcmb2_U-OQUfze=o)!Py#AZ@twho(0(h+z~I7|6l`W*HX=FwAC!Fin;;Zs1{qvQ6Kg z`!2~2*t2nW}LI$r2b0(oxb7b)=v9OX8Pmv>_#&Tst&0BdwiVU0; z84VN&sxtIA_4|k-VP!_QnY;s)h)`$T_)%xGGBFBGHt=m>S0PHJ>GUP;e5%ALHMe6}uWT1*bdckR?8QIDzI)J01Aq-rwzej~rYfMi|f*VD9h4N2E#VZw5* z!-x!hmQ!x3{x>E^qrpX!<#SEQ(rNhT)QoGUwv-uRATIHu z$BsfX4A)OMZEa7fAw~+KtkWDQHpTdu2-|K)%8fBuw?(PYi3;YJ+AS&+a;AnsW^HF) zd%IA@By$h7d(E!YG0LL$*Of(XR5HtwH6i`3JGBfmShsV(st46fGyIbhpzcXMq?rQV)HKkf{F0-M4^>Sx{a7p2=u2HA&9)u))b^vYndV^%b$0&LHq;_M zG)Xal>ZV#=UADC^kov|NB;5WmGl&M}8m^X-l?xKXS~I_bRCZ<8k%pp>U#f%Xj)oe zkYudI7DH1T4DS?{%Er>x3M2J9!8vg>w!@gYMIj=d)|Qyu_W0A7KyzD6Q<56g5@~OZ g*{OY7ijwGHkGa, + checksum: bool, + read_pos: usize, + eof: bool, + buf: [u8; K_BLOCK_SIZE], + buf_len: usize, + buf_read_idx: usize, +} + +pub trait SeekableReader: Read + Seek {} +// pub type SeekableReader = dyn Read + Seek; + +impl SeekableReader for File {} + +impl LogReader { + pub fn new(mut file_reader: Box, checksum: bool, initial_offset: usize) -> LogReader { + let offset_in_block = initial_offset % K_BLOCK_SIZE; + let mut block_start_location = initial_offset - offset_in_block; + if offset_in_block > K_BLOCK_SIZE - 6 { + block_start_location += K_BLOCK_SIZE; + } + file_reader.seek(Start(block_start_location as u64)).expect("seek to initial_offset"); + Self { + file_reader, + checksum, + read_pos: block_start_location, + eof: false, + buf: [0; K_BLOCK_SIZE], + buf_len: 0, + buf_read_idx: 0, + } + } + + pub fn read_next(&mut self) -> Result> { + let mut tmp: Option> = None; + loop { + if self.buf_read_idx >= self.buf_len { + self.read_buf()?; + } + let data_len = (self.buf[self.buf_read_idx + 4] as usize) + ((self.buf[self.buf_read_idx + 5] as usize) << 8); + let record_type = self.buf[self.buf_read_idx + 6]; + self.buf_read_idx += 7; + // CRC check + self.check_crc(data_len)?; + match record_type { + K_FULL_TYPE => { + let end_idx = self.buf_read_idx + data_len; + let data = &self.buf[self.buf_read_idx..end_idx]; + self.buf_read_idx += data_len; + return Ok(Some(Slice::from_buf(data))); + } + K_FIRST_TYPE => { + tmp = Some(Vec::with_capacity(K_BLOCK_SIZE)); + let partial_data = &self.buf[self.buf_read_idx..]; + tmp.as_mut().unwrap().write(partial_data)?; + } + K_MIDDLE_TYPE => { + tmp.as_mut().unwrap().write(self.buf.as_ref())?; + } + K_LAST_TYPE => { + let end_idx = self.buf_read_idx + data_len; + let partial_data = &self.buf[self.buf_read_idx..end_idx]; + self.buf_read_idx += data_len; + tmp.as_mut().unwrap().write(partial_data)?; + let data = tmp.unwrap(); + return Ok(Some(Slice::from_vec(data))); + } + _ => { + return Err(Status::wrapper(LevelError::KCorruption, + format!("bad record_type: {}", record_type).into())); + } + } + } + } + + fn read_buf(&mut self) -> Result<()> { + self.buf_read_idx = 0; + self.buf_len = self.file_reader.read(self.buf.as_mut())?; + if self.buf_len < K_BLOCK_SIZE { + self.eof = true; + } + Ok(()) + } + + #[inline] + fn check_crc(&self, data_len: usize) -> Result<()> { + if !self.checksum { + return Ok(()); + } + let crc_bytes = &self.buf[(self.buf_read_idx - 7)..(self.buf_read_idx - 3)]; + let expect = Coding::decode_fixed32(crc_bytes); + let data = &self.buf[(self.buf_read_idx - 1)..(self.buf_read_idx + data_len)]; + let crc = data.as_crc(); + let mask = CRC::mask(crc); + if expect == mask { + Ok(()) + } else { + Err(Status::wrapper(LevelError::KCorruption, "bad record, crc check failed".into())) + } + } +} \ No newline at end of file diff --git a/src/db/log_wr_test.rs b/src/db/log_wr_test.rs new file mode 100644 index 0000000..98b1648 --- /dev/null +++ b/src/db/log_wr_test.rs @@ -0,0 +1,61 @@ + +mod test { + use std::fs::File; + use crate::db::log_reader::LogReader; + use crate::db::log_writer::LogWriter; + use crate::traits::coding_trait::CodingTrait; + use crate::util::coding::Coding; + use crate::util::crc::{AsCrc, ToMask}; + use crate::util::Result; + use crate::util::slice::Slice; + + #[test] + fn write() -> Result<()> { + let file = box File::create("../../1.bin")?; + let mut writer = LogWriter::new(file); + let sample: Vec = ('0'..='9').map(|a|a as u8).collect(); + for i in 0..100 { + let slice = generate_slice(i, &sample); + writer.add_record(slice)?; + } + Ok(()) + } + + #[test] + fn read() -> Result<()> { + let file = box File::open("../../1.bin")?; + let mut reader = LogReader::new(file, true, 0); + let sample: Vec = ('0'..='9').map(|a|a as u8).collect(); + for i in 0..100 { + let slice = reader.read_next().expect("not error").expect("must have record"); + let mut expect = generate_slice(i, &sample); + assert_eq!(expect.len(), slice.len()); + assert_eq!(expect.as_ref(), slice.as_ref()) + } + Ok(()) + } + + fn generate_slice(i: usize, sample: &Vec) -> Slice { + let mut slice = Vec::with_capacity(64); + for j in 0..=i { + slice.push(sample[j%10]); + } + Slice::from_vec(slice) + } + + #[test] + fn test() { + let expect_crc_bytes = [0xD1, 0xB1, 0x09, 0x9A]; + let expect_crc = Coding::decode_fixed32(expect_crc_bytes.as_ref()); + let raw_bytes = [0x01_u8, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, + 0x39, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39]; + let crc = raw_bytes.as_crc().to_mask(); + let partial_extend = raw_bytes[0..1].as_crc(); + let crc1 = raw_bytes[1..].as_crc_extend(partial_extend).to_mask(); + println!("expect_crc: {}, crc: {}, crc1: {}", expect_crc, crc, crc1); + assert_eq!(expect_crc, crc); + assert_eq!(expect_crc, crc1); + + } + +} \ No newline at end of file diff --git a/src/db/log_writer.rs b/src/db/log_writer.rs new file mode 100644 index 0000000..42406db --- /dev/null +++ b/src/db/log_writer.rs @@ -0,0 +1,98 @@ +use std::io::Write; +use crate::traits::coding_trait::CodingTrait; +use crate::util::coding::Coding; +use crate::util::crc::{AsCrc, CRC}; +use crate::util::slice::Slice; +use crate::util::Result; + +pub const K_ZERO_TYPE: u8 = 0; +pub const K_FULL_TYPE: u8 = 1; +pub const K_FIRST_TYPE: u8 = 2; +pub const K_MIDDLE_TYPE: u8 = 3; +pub const K_LAST_TYPE: u8 = 4; + +pub const K_MAX_RECORD_TYPE: usize = K_LAST_TYPE as usize; +/// Log block size +pub const K_BLOCK_SIZE: usize = 32768; + +/// Header is checksum (4 bytes), length (2 bytes), type (1 byte). +pub const K_HEADER_SIZE: usize = 4 + 2 + 1; + +const K_EMPTY_BYTES: [u8; 6] = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; + +pub struct LogWriter { + file_writer: Box, + /// Offset in current block + block_offset: usize, + + type_crc: [u32; K_MAX_RECORD_TYPE + 1], +} + +impl LogWriter { + pub fn new(file_writer: Box) -> LogWriter { + let mut type_crc = [0_u32; K_MAX_RECORD_TYPE + 1]; + init_type_crc(&mut type_crc); + Self { + file_writer, + block_offset: 0, + type_crc, + } + } + + pub fn add_record(&mut self, slice: Slice) -> Result<()> { + let left_over = K_BLOCK_SIZE - self.block_offset; + assert!(left_over >= 0); + let mut left = slice.len(); + let mut begin = true; + let mut start_idx = 0; + while begin || left > 0 { + if left_over < K_HEADER_SIZE { + if left_over > 0 { + self.file_writer.write(&K_EMPTY_BYTES[0..left_over])?; + } + self.block_offset = 0; + } + let avail = K_BLOCK_SIZE - self.block_offset - K_HEADER_SIZE; + let fragment_length = if left < avail { left } else { avail }; + let end = left == fragment_length; + let record_type = if begin && end { + K_FULL_TYPE + } else if begin { + K_FIRST_TYPE + } else if end { + K_LAST_TYPE + } else { + K_MIDDLE_TYPE + }; + self.emit_physical_record(record_type, slice.as_sub_ref(start_idx, fragment_length))?; + begin = false; + left -= fragment_length; + start_idx += fragment_length; + } + Ok(()) + } + + fn emit_physical_record(&mut self, record_type: u8, data: &[u8]) -> Result<()> { + let mut header = [0_u8; K_HEADER_SIZE]; + header[4] = (data.len() & 0xff) as u8; + header[5] = (data.len() >> 8) as u8; + header[6] = record_type; + let mut crc = CRC::extend(self.type_crc[record_type as usize], data); + crc = CRC::mask(crc); + Coding::encode_fixed32(crc, header.as_mut(), 0); + self.file_writer.write(header.as_ref())?; + self.block_offset += K_HEADER_SIZE; + if !data.is_empty() { + self.file_writer.write(data)?; + self.block_offset += data.len(); + } + self.file_writer.flush()?; + Ok(()) + } +} + +fn init_type_crc(type_crc: &mut [u32]) { + for i in 0..=K_MAX_RECORD_TYPE { + type_crc[i] = [i as u8].as_crc(); + } +} \ No newline at end of file diff --git a/src/db/mod.rs b/src/db/mod.rs index e69de29..e696750 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -0,0 +1,3 @@ +pub mod log_writer; +pub mod log_reader; +mod log_wr_test; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 3768322..064907a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(box_syntax)] + mod db; mod table; mod util; diff --git a/src/util/coding.rs b/src/util/coding.rs index 33c8769..125c042 100644 --- a/src/util/coding.rs +++ b/src/util/coding.rs @@ -1,4 +1,3 @@ -use std::ops::Deref; use crate::traits::coding_trait::CodingTrait; use crate::traits::coding_trait::Coding32; use crate::traits::coding_trait::Coding64; diff --git a/src/util/crc.rs b/src/util/crc.rs index cf7918b..d15bb3a 100644 --- a/src/util/crc.rs +++ b/src/util/crc.rs @@ -260,6 +260,13 @@ impl AsCrc for &[T] { } } +impl AsCrc for [u8] { + #[inline] + fn as_crc_extend(&self, crc: u32) -> u32 { + CRC::extend(crc, self) + } +} + impl AsCrc for &str { #[inline] fn as_crc_extend(&self, crc: u32) -> u32 { @@ -288,6 +295,22 @@ impl AsCrc for String { } } +pub trait ToMask { + fn to_mask(self) -> u32; + fn unmask(self) -> u32; +} + +impl ToMask for u32 { + #[inline] + fn to_mask(self) -> u32 { + CRC::mask(self) + } + #[inline] + fn unmask(self) -> u32 { + CRC::unmask(self) + } +} + pub struct CRC {} /// step1, Process one byte at a time. @@ -355,7 +378,8 @@ impl CRC { let mut l = init_crc ^ K_CRC32_XOR; // 4 byte align offset - let x = ptr_align_by4_offset(data.as_ptr()); + let mut x = ptr_align_by4_offset(data.as_ptr()); + x = if n < x { n } else { x }; // println!("x: {}, l: {:x}, n: {}", x, l, n); while s < x { step1!(data, s, l); @@ -430,7 +454,7 @@ impl CRC { #[inline] pub fn mask(crc: u32) -> u32 { // Rotate right by 15 bits and add a constant. - ((crc >> 15) | (crc << 17)) + K_MASK_DELTA + ((crc >> 15) | (crc << 17)).wrapping_add(K_MASK_DELTA) } /// 将CRC掩码转为CRC码 diff --git a/src/util/crc_test.rs b/src/util/crc_test.rs index 8d34ff9..c67c0db 100644 --- a/src/util/crc_test.rs +++ b/src/util/crc_test.rs @@ -1,4 +1,4 @@ -use crate::util::crc::{AsCrc, CRC}; +use crate::util::crc::{AsCrc, CRC, ToMask}; use crate::util::slice::Slice; #[test] @@ -55,4 +55,15 @@ fn test_mask() { assert_ne!(crc, CRC::mask(CRC::mask(crc))); assert_eq!(crc, CRC::unmask(CRC::mask(crc))); assert_eq!(crc, CRC::unmask(CRC::unmask(CRC::mask(CRC::mask(crc))))); +} + +#[test] +fn test_mask2() { + let a0 = [1]; + let a1 = [0, 97, 98, 99, 100]; + let a2 = [1, 97, 98, 99, 100]; + let crc0 = a1[1..].as_crc_extend(a0.as_crc()).to_mask(); + let crc1 = a2.as_crc().to_mask(); + println!("crc0: {}, crc1: {}", crc0, crc1); + assert_eq!(crc0, crc1); } \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs index 3d2299f..cb17c90 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,7 +1,9 @@ -use crate::util::status::LevelError; use std::result; + pub use arena::Arena; +use crate::util::status::Status; + /// 常量定义 pub mod r#const; @@ -16,7 +18,7 @@ pub mod status; mod status_test; pub mod comparator; mod comparator_test; -mod crc; +pub mod crc; mod crc_test; pub mod bloom_filter; mod bloom_filter_test; @@ -24,7 +26,7 @@ pub mod filter_policy; mod filter_policy_test; /// 定义别名 -pub type ResultT = result::Result; +pub type Result = result::Result; pub mod histogram; mod histogram_test; diff --git a/src/util/slice.rs b/src/util/slice.rs index 7c0bbb3..26ea8b1 100644 --- a/src/util/slice.rs +++ b/src/util/slice.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::ops::Deref; +#[derive(Debug)] pub struct Slice { data: Vec, } @@ -31,6 +32,14 @@ impl Slice { data: buf.to_owned() } } + + #[inline] + pub fn from_vec(data: Vec) -> Self { + Self { + data + } + } + /// 获取 slice 长度 #[inline] pub fn size(&self) -> usize { @@ -43,6 +52,11 @@ impl Slice { self.data.is_empty() } + #[inline] + pub fn as_sub_ref(&self, start: usize, length: usize) -> &[u8] { + &(**self)[start..(start+length)] + } + /// 移除头部 n 个元素 pub fn remove_prefix(&self, n: usize) -> Slice { assert!(self.size() >= n); diff --git a/src/util/status.rs b/src/util/status.rs index 9bb6662..8f5365d 100644 --- a/src/util/status.rs +++ b/src/util/status.rs @@ -1,12 +1,13 @@ -use std::fmt::{Display, Error, Formatter}; +use std::fmt::{Display, Formatter}; +use std::io; use crate::util::r#const::COLON_WHITE_SPACE; -use crate::util::ResultT; use crate::util::slice::Slice; -use crate::util::status::LevelError::{KCorruption, KIOError, KInvalidArgument, KNotSupported, KNotFound, KOk}; +use crate::util::status::LevelError::{KCorruption, KIOError, KInvalidArgument, KNotSupported, KNotFound, KOk, KBadRecord}; /// db 中的返回状态,将错误号和错误信息封装成Status类,统一进行处理。 /// 在 leveldb的实现里, 为了节省空间Status将返回码(code), 错误信息message及长度打包存储于一个字符串数组中, 来存储错误信息。 /// 在该项目中, 使用LevelError 和 Slice 存储错误信息 +#[derive(Debug)] pub struct Status { err: LevelError, msg: Slice @@ -125,18 +126,17 @@ impl Status { /// ``` #[inline] pub fn to_string(self) -> String { - let err = &self.err; - - let msg_type = match &err { + let msg_type = match self.err { KOk => "OK", KNotFound => "NotFound: ", KCorruption => "Corruption: ", KNotSupported => "Not implemented: ", KInvalidArgument => "Invalid argument: ", - KIOError => "IO error: " + KIOError => "IO error: ", + KBadRecord=> "wal bad record", }; - if err.is_ok() { + if self.err.is_ok() { return String::from(msg_type); } @@ -171,6 +171,7 @@ impl Status { // } /// Status 的状态 +#[derive(Debug)] pub enum LevelError { KOk, KNotFound, @@ -178,6 +179,7 @@ pub enum LevelError { KNotSupported, KInvalidArgument, KIOError, + KBadRecord, } impl LevelError { @@ -235,7 +237,7 @@ impl LevelError { Status{ err: KNotFound, - msg: msg + msg } } @@ -258,7 +260,7 @@ impl LevelError { Status{ err: KCorruption, - msg: msg + msg } } @@ -267,7 +269,7 @@ impl LevelError { Status{ err: KNotSupported, - msg: msg + msg } } @@ -276,7 +278,7 @@ impl LevelError { Status{ err: KInvalidArgument, - msg: msg + msg } } @@ -300,7 +302,7 @@ impl LevelError { Status{ err: KIOError, - msg: msg + msg } } } @@ -336,12 +338,19 @@ impl TryFrom for LevelError { 3 => Ok(KNotSupported), 4 => Ok(KInvalidArgument), 5 => Ok(KIOError), + 6 => Ok(KBadRecord), // all other numbers _ => Err(String::from(format!("Unknown code: {}", value))) } } } +impl From for Status { + fn from(e: io::Error) -> Self { + LevelError::io_error(e.to_string().into(), "".into()) + } +} + impl Display for LevelError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut print = String::new(); @@ -352,7 +361,8 @@ impl Display for LevelError { KCorruption => "Corruption: ", KNotSupported => "Not implemented: ", KInvalidArgument => "Invalid argument: ", - KIOError => "IO error: " + KIOError => "IO error: ", + KBadRecord => "wal bad record: ", }; print.push_str(msg_type); diff --git a/src/util/status_test.rs b/src/util/status_test.rs index eadccd3..2ac1030 100644 --- a/src/util/status_test.rs +++ b/src/util/status_test.rs @@ -1,6 +1,5 @@ mod test { - use std::borrow::Cow; use crate::util::r#const::COLON_WHITE_SPACE; use crate::util::slice::Slice; use crate::util::status::{LevelError, Status}; -- Gitee From 1ed89a3eecdd2d9d1a5266fb9eaf6d604821af2a Mon Sep 17 00:00:00 2001 From: wangboo <5417808+wangboa@user.noreply.gitee.com> Date: Wed, 4 Jan 2023 20:56:32 +0800 Subject: [PATCH 07/10] modify readme file --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 828a355..07c70b6 100644 --- a/README.md +++ b/README.md @@ -65,8 +65,8 @@ LSM tree 是许多 KV型或日志型数据库所依赖的核心实现,例如Bi | skiplist | 未认领 | | | MemTableIterator | 未认领 | | | MemTable | 未认领 | | -| LogReader | wangboo | | -| LogWriter | wangboo | | +| LogReader | wangboo | 90% | +| LogWriter | wangboo | 90% | | TableCache | 未认领 | | | FileMetaData | 未认领 | | | VersionEdit | 未认领 | | -- Gitee From 2ea0b6950e3ce28ade39c9293c69aadd0f74e64e Mon Sep 17 00:00:00 2001 From: fengyang Date: Tue, 10 Jan 2023 20:42:34 +0800 Subject: [PATCH 08/10] =?UTF-8?q?Status=20=E9=80=BB=E8=BE=91=E7=B2=BE?= =?UTF-8?q?=E7=AE=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/util/status.rs | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/src/util/status.rs b/src/util/status.rs index 87a5381..b24dbe2 100644 --- a/src/util/status.rs +++ b/src/util/status.rs @@ -183,45 +183,27 @@ pub enum LevelError { impl LevelError { pub fn is_ok(&self) -> bool { - match self { - KOk => true, - _ => false - } + matches!(*self, KOk) } pub fn is_not_found(&self) -> bool { - match self { - KNotFound => true, - _ => false - } + matches!(*self, KNotFound) } pub fn is_corruption(&self) -> bool { - match self { - KCorruption => true, - _ => false - } + matches!(*self, KCorruption) } pub fn is_io_error(&self) -> bool { - match self { - KIOError => true, - _ => false - } + matches!(*self, KIOError) } pub fn is_not_supported_error(&self) -> bool { - match self { - KNotSupported => true, - _ => false - } + matches!(*self, KNotSupported) } pub fn is_invalid_argument(&self) -> bool { - match self { - KInvalidArgument => true, - _ => false - } + matches!(*self, KInvalidArgument) } pub fn ok() -> Status { -- Gitee From 5e7ae5933e48f248fca20f21beba6a31b571033c Mon Sep 17 00:00:00 2001 From: fengyang Date: Tue, 10 Jan 2023 21:16:31 +0800 Subject: [PATCH 09/10] =?UTF-8?q?Status=20=E9=80=BB=E8=BE=91=E7=B2=BE?= =?UTF-8?q?=E7=AE=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/traits/filter_policy_trait.rs | 5 ++--- src/util/filter_policy.rs | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/traits/filter_policy_trait.rs b/src/traits/filter_policy_trait.rs index ea17aea..361e5a8 100644 --- a/src/traits/filter_policy_trait.rs +++ b/src/traits/filter_policy_trait.rs @@ -12,13 +12,12 @@ pub trait FilterPolicy { /// 根据指定的参数创建过滤器,并返回结果, 结果为dst的原始内容 + append结果。 /// 参数keys[0,n-1]包含依据用户提供的comparator排序的key列表--可重复, - /// 并把根据这些key创建的filter追加到 dst中。 + /// 并把根据这些key创建的filter追加返回。 /// /// # Arguments /// /// * `keys`: /// * `n`: - /// * `dst`: /// /// returns: String /// @@ -27,7 +26,7 @@ pub trait FilterPolicy { /// ``` /// /// ``` - fn create_filter(&self, keys: Slice, n: u32, dst: String) -> String; + fn create_filter(&self, keys: Slice, n: u32) -> String; fn key_may_match(key: &Slice, filter: &Slice) -> bool; } \ No newline at end of file diff --git a/src/util/filter_policy.rs b/src/util/filter_policy.rs index af7c915..46303bf 100644 --- a/src/util/filter_policy.rs +++ b/src/util/filter_policy.rs @@ -57,12 +57,20 @@ impl FilterPolicy for BloomFilterPolicy { String::from("leveldb.BuiltinBloomFilter2") } - fn create_filter(&self, keys: Slice, n: u32, dst: String) -> String { + fn create_filter(&self, keys: Slice, n: usize) -> String { // 根据指定的参数创建过滤器,并返回结果, 结果为dst的原始内容 + append结果。 - // 参数keys[0,n-1]包含依据用户提供的comparator排序的key列表--可重复, - // 并把根据这些key创建的filter追加到 dst中。 - // - todo!() + // 参数keys[0,n-1]包含依据用户提供的comparator排序的key列表--可重复, 并把根据这些key创建的filter追加到 dst中。 + let mut bits: usize = n * self.bits_per_key; + + // For small n, we can see a very high false positive rate. Fix it + // by enforcing a minimum bloom filter length. + if bits < 64 { + bits = 64; + } + + let bytes: usize = (bits + 7) / 8; + bits = bytes * 8; + } fn key_may_match(key: &Slice, filter: &Slice) -> bool { -- Gitee From 49a0c1452941e34a3cbbe51d29679f0afcf45366 Mon Sep 17 00:00:00 2001 From: fengyang Date: Mon, 16 Jan 2023 11:14:57 +0800 Subject: [PATCH 10/10] md --- README.md | 50 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 07c70b6..67da271 100644 --- a/README.md +++ b/README.md @@ -58,21 +58,39 @@ LSM tree 是许多 KV型或日志型数据库所依赖的核心实现,例如Bi | Status | fengyang | 100% | | Slice | wangboo | 100% | + 2. 1.1.0 版本, 完成基础零部件 -| 功能模块 | 完成人 | 进度 | -|------------------|---------|-----| -| skiplist | 未认领 | | -| MemTableIterator | 未认领 | | -| MemTable | 未认领 | | -| LogReader | wangboo | 90% | -| LogWriter | wangboo | 90% | -| TableCache | 未认领 | | -| FileMetaData | 未认领 | | -| VersionEdit | 未认领 | | -| VersionSet | 未认领 | | -| ReadOptions | 未认领 | | -| WriteOptions | 未认领 | | -| WriteBatch | 未认领 | | - -3. 1.2.0 版本, 完成核心组件 \ No newline at end of file +| 功能模块 | 完成人 | 进度 | +|-------------------------------------------------------------------------------|----------|---| +| util.Options(ReadOptions、WriteOptions) | 未认领 | | +| util.ENV(WritableFile、SequentialFile、RandomAccessFile、FileLock) | 未认领 | | +| util.Logger | 未认领 | | +| table.Block、BlockBuilder、FilterBlockBuilder | 未认领 | | +| FilterBlock、FilterBlockReader | 未认领 | | +| table.format(Footer、BlockHandle) | 未认领 | | +| db.dbformat(InternalKeyComparator、InternalFilterPolicy、LookupKey、InternalKey) | 未认领 | | +| db.SkipList | 未认领 | | +| table.Iterator(DBIter、MergingIterator、TwoLevelIterator...) | 未认领 | | +| IteratorWrapper | 未认领 | | +| db.MemTable(MemTable、MemTableIterator) | 未认领 | | +| table.Table | 未认领 | | +| db.leveldb_util | 未认领 | | +| db.log_format | 未认领 | | +| db.LogReader | wangboo | 90% | +| db.LogWriter | wangboo | 90% | +| db.TableCache | colagy | | +| db.VersionEdit(Tag、VersionEdit、FileMetaData) | fengyang | | +| db.VersionSet(Version、LevelFileNumIterator、SaverState) | fengyang | | +| WriteBatch | 未认领 | | + + +3. 1.2.0 版本, 完成核心组件 + +| 功能模块 | 完成人 | 进度 | +|--------------------|---------|-----| +| DB(DBImpl、ModelDB) | 未认领 | | +| Repairer | 未认领 | | +| Snapshot | 未认领 | | +| DumpFile | 未认领 | | +| | 未认领 | | \ No newline at end of file -- Gitee