From c28bac4515cba7df2c956b572cc3d5f5b1320fba Mon Sep 17 00:00:00 2001 From: wangboo <5417808+wangboa@user.noreply.gitee.com> Date: Wed, 4 Jan 2023 20:54:44 +0800 Subject: [PATCH 1/2] add log reader and writer --- 1.bin | Bin 0 -> 5750 bytes README.md | 28 +++++----- src/db/log_reader.rs | 114 ++++++++++++++++++++++++++++++++++++++++ src/db/log_wr_test.rs | 61 +++++++++++++++++++++ src/db/log_writer.rs | 98 ++++++++++++++++++++++++++++++++++ src/db/mod.rs | 3 ++ src/lib.rs | 2 + src/util/coding.rs | 1 - src/util/crc.rs | 28 +++++++++- src/util/crc_test.rs | 13 ++++- src/util/mod.rs | 8 +-- src/util/slice.rs | 14 +++++ src/util/status.rs | 38 +++++++++----- src/util/status_test.rs | 1 - 14 files changed, 373 insertions(+), 36 deletions(-) create mode 100644 1.bin create mode 100644 src/db/log_reader.rs create mode 100644 src/db/log_wr_test.rs create mode 100644 src/db/log_writer.rs diff --git a/1.bin b/1.bin new file mode 100644 index 0000000000000000000000000000000000000000..ae41623ec551be8d83267e117d57ff87f5865988 GIT binary patch literal 5750 zcmb2_U-OQUfze=o)!Py#AZ@twho(0(h+z~I7|6l`W*HX=FwAC!Fin;;Zs1{qvQ6Kg z`!2~2*t2nW}LI$r2b0(oxb7b)=v9OX8Pmv>_#&Tst&0BdwiVU0; z84VN&sxtIA_4|k-VP!_QnY;s)h)`$T_)%xGGBFBGHt=m>S0PHJ>GUP;e5%ALHMe6}uWT1*bdckR?8QIDzI)J01Aq-rwzej~rYfMi|f*VD9h4N2E#VZw5* z!-x!hmQ!x3{x>E^qrpX!<#SEQ(rNhT)QoGUwv-uRATIHu z$BsfX4A)OMZEa7fAw~+KtkWDQHpTdu2-|K)%8fBuw?(PYi3;YJ+AS&+a;AnsW^HF) zd%IA@By$h7d(E!YG0LL$*Of(XR5HtwH6i`3JGBfmShsV(st46fGyIbhpzcXMq?rQV)HKkf{F0-M4^>Sx{a7p2=u2HA&9)u))b^vYndV^%b$0&LHq;_M zG)Xal>ZV#=UADC^kov|NB;5WmGl&M}8m^X-l?xKXS~I_bRCZ<8k%pp>U#f%Xj)oe zkYudI7DH1T4DS?{%Er>x3M2J9!8vg>w!@gYMIj=d)|Qyu_W0A7KyzD6Q<56g5@~OZ g*{OY7ijwGHkGa, + checksum: bool, + read_pos: usize, + eof: bool, + buf: [u8; K_BLOCK_SIZE], + buf_len: usize, + buf_read_idx: usize, +} + +pub trait SeekableReader: Read + Seek {} +// pub type SeekableReader = dyn Read + Seek; + +impl SeekableReader for File {} + +impl LogReader { + pub fn new(mut file_reader: Box, checksum: bool, initial_offset: usize) -> LogReader { + let offset_in_block = initial_offset % K_BLOCK_SIZE; + let mut block_start_location = initial_offset - offset_in_block; + if offset_in_block > K_BLOCK_SIZE - 6 { + block_start_location += K_BLOCK_SIZE; + } + file_reader.seek(Start(block_start_location as u64)).expect("seek to initial_offset"); + Self { + file_reader, + checksum, + read_pos: block_start_location, + eof: false, + buf: [0; K_BLOCK_SIZE], + buf_len: 0, + buf_read_idx: 0, + } + } + + pub fn read_next(&mut self) -> Result> { + let mut tmp: Option> = None; + loop { + if self.buf_read_idx >= self.buf_len { + self.read_buf()?; + } + let data_len = (self.buf[self.buf_read_idx + 4] as usize) + ((self.buf[self.buf_read_idx + 5] as usize) << 8); + let record_type = self.buf[self.buf_read_idx + 6]; + self.buf_read_idx += 7; + // CRC check + self.check_crc(data_len)?; + match record_type { + K_FULL_TYPE => { + let end_idx = self.buf_read_idx + data_len; + let data = &self.buf[self.buf_read_idx..end_idx]; + self.buf_read_idx += data_len; + return Ok(Some(Slice::from_buf(data))); + } + K_FIRST_TYPE => { + tmp = Some(Vec::with_capacity(K_BLOCK_SIZE)); + let partial_data = &self.buf[self.buf_read_idx..]; + tmp.as_mut().unwrap().write(partial_data)?; + } + K_MIDDLE_TYPE => { + tmp.as_mut().unwrap().write(self.buf.as_ref())?; + } + K_LAST_TYPE => { + let end_idx = self.buf_read_idx + data_len; + let partial_data = &self.buf[self.buf_read_idx..end_idx]; + self.buf_read_idx += data_len; + tmp.as_mut().unwrap().write(partial_data)?; + let data = tmp.unwrap(); + return Ok(Some(Slice::from_vec(data))); + } + _ => { + return Err(Status::wrapper(LevelError::KCorruption, + format!("bad record_type: {}", record_type).into())); + } + } + } + } + + fn read_buf(&mut self) -> Result<()> { + self.buf_read_idx = 0; + self.buf_len = self.file_reader.read(self.buf.as_mut())?; + if self.buf_len < K_BLOCK_SIZE { + self.eof = true; + } + Ok(()) + } + + #[inline] + fn check_crc(&self, data_len: usize) -> Result<()> { + if !self.checksum { + return Ok(()); + } + let crc_bytes = &self.buf[(self.buf_read_idx - 7)..(self.buf_read_idx - 3)]; + let expect = Coding::decode_fixed32(crc_bytes); + let data = &self.buf[(self.buf_read_idx - 1)..(self.buf_read_idx + data_len)]; + let crc = data.as_crc(); + let mask = CRC::mask(crc); + if expect == mask { + Ok(()) + } else { + Err(Status::wrapper(LevelError::KCorruption, "bad record, crc check failed".into())) + } + } +} \ No newline at end of file diff --git a/src/db/log_wr_test.rs b/src/db/log_wr_test.rs new file mode 100644 index 0000000..98b1648 --- /dev/null +++ b/src/db/log_wr_test.rs @@ -0,0 +1,61 @@ + +mod test { + use std::fs::File; + use crate::db::log_reader::LogReader; + use crate::db::log_writer::LogWriter; + use crate::traits::coding_trait::CodingTrait; + use crate::util::coding::Coding; + use crate::util::crc::{AsCrc, ToMask}; + use crate::util::Result; + use crate::util::slice::Slice; + + #[test] + fn write() -> Result<()> { + let file = box File::create("../../1.bin")?; + let mut writer = LogWriter::new(file); + let sample: Vec = ('0'..='9').map(|a|a as u8).collect(); + for i in 0..100 { + let slice = generate_slice(i, &sample); + writer.add_record(slice)?; + } + Ok(()) + } + + #[test] + fn read() -> Result<()> { + let file = box File::open("../../1.bin")?; + let mut reader = LogReader::new(file, true, 0); + let sample: Vec = ('0'..='9').map(|a|a as u8).collect(); + for i in 0..100 { + let slice = reader.read_next().expect("not error").expect("must have record"); + let mut expect = generate_slice(i, &sample); + assert_eq!(expect.len(), slice.len()); + assert_eq!(expect.as_ref(), slice.as_ref()) + } + Ok(()) + } + + fn generate_slice(i: usize, sample: &Vec) -> Slice { + let mut slice = Vec::with_capacity(64); + for j in 0..=i { + slice.push(sample[j%10]); + } + Slice::from_vec(slice) + } + + #[test] + fn test() { + let expect_crc_bytes = [0xD1, 0xB1, 0x09, 0x9A]; + let expect_crc = Coding::decode_fixed32(expect_crc_bytes.as_ref()); + let raw_bytes = [0x01_u8, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, + 0x39, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39]; + let crc = raw_bytes.as_crc().to_mask(); + let partial_extend = raw_bytes[0..1].as_crc(); + let crc1 = raw_bytes[1..].as_crc_extend(partial_extend).to_mask(); + println!("expect_crc: {}, crc: {}, crc1: {}", expect_crc, crc, crc1); + assert_eq!(expect_crc, crc); + assert_eq!(expect_crc, crc1); + + } + +} \ No newline at end of file diff --git a/src/db/log_writer.rs b/src/db/log_writer.rs new file mode 100644 index 0000000..42406db --- /dev/null +++ b/src/db/log_writer.rs @@ -0,0 +1,98 @@ +use std::io::Write; +use crate::traits::coding_trait::CodingTrait; +use crate::util::coding::Coding; +use crate::util::crc::{AsCrc, CRC}; +use crate::util::slice::Slice; +use crate::util::Result; + +pub const K_ZERO_TYPE: u8 = 0; +pub const K_FULL_TYPE: u8 = 1; +pub const K_FIRST_TYPE: u8 = 2; +pub const K_MIDDLE_TYPE: u8 = 3; +pub const K_LAST_TYPE: u8 = 4; + +pub const K_MAX_RECORD_TYPE: usize = K_LAST_TYPE as usize; +/// Log block size +pub const K_BLOCK_SIZE: usize = 32768; + +/// Header is checksum (4 bytes), length (2 bytes), type (1 byte). +pub const K_HEADER_SIZE: usize = 4 + 2 + 1; + +const K_EMPTY_BYTES: [u8; 6] = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; + +pub struct LogWriter { + file_writer: Box, + /// Offset in current block + block_offset: usize, + + type_crc: [u32; K_MAX_RECORD_TYPE + 1], +} + +impl LogWriter { + pub fn new(file_writer: Box) -> LogWriter { + let mut type_crc = [0_u32; K_MAX_RECORD_TYPE + 1]; + init_type_crc(&mut type_crc); + Self { + file_writer, + block_offset: 0, + type_crc, + } + } + + pub fn add_record(&mut self, slice: Slice) -> Result<()> { + let left_over = K_BLOCK_SIZE - self.block_offset; + assert!(left_over >= 0); + let mut left = slice.len(); + let mut begin = true; + let mut start_idx = 0; + while begin || left > 0 { + if left_over < K_HEADER_SIZE { + if left_over > 0 { + self.file_writer.write(&K_EMPTY_BYTES[0..left_over])?; + } + self.block_offset = 0; + } + let avail = K_BLOCK_SIZE - self.block_offset - K_HEADER_SIZE; + let fragment_length = if left < avail { left } else { avail }; + let end = left == fragment_length; + let record_type = if begin && end { + K_FULL_TYPE + } else if begin { + K_FIRST_TYPE + } else if end { + K_LAST_TYPE + } else { + K_MIDDLE_TYPE + }; + self.emit_physical_record(record_type, slice.as_sub_ref(start_idx, fragment_length))?; + begin = false; + left -= fragment_length; + start_idx += fragment_length; + } + Ok(()) + } + + fn emit_physical_record(&mut self, record_type: u8, data: &[u8]) -> Result<()> { + let mut header = [0_u8; K_HEADER_SIZE]; + header[4] = (data.len() & 0xff) as u8; + header[5] = (data.len() >> 8) as u8; + header[6] = record_type; + let mut crc = CRC::extend(self.type_crc[record_type as usize], data); + crc = CRC::mask(crc); + Coding::encode_fixed32(crc, header.as_mut(), 0); + self.file_writer.write(header.as_ref())?; + self.block_offset += K_HEADER_SIZE; + if !data.is_empty() { + self.file_writer.write(data)?; + self.block_offset += data.len(); + } + self.file_writer.flush()?; + Ok(()) + } +} + +fn init_type_crc(type_crc: &mut [u32]) { + for i in 0..=K_MAX_RECORD_TYPE { + type_crc[i] = [i as u8].as_crc(); + } +} \ No newline at end of file diff --git a/src/db/mod.rs b/src/db/mod.rs index e69de29..e696750 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -0,0 +1,3 @@ +pub mod log_writer; +pub mod log_reader; +mod log_wr_test; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 3768322..064907a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(box_syntax)] + mod db; mod table; mod util; diff --git a/src/util/coding.rs b/src/util/coding.rs index 33c8769..125c042 100644 --- a/src/util/coding.rs +++ b/src/util/coding.rs @@ -1,4 +1,3 @@ -use std::ops::Deref; use crate::traits::coding_trait::CodingTrait; use crate::traits::coding_trait::Coding32; use crate::traits::coding_trait::Coding64; diff --git a/src/util/crc.rs b/src/util/crc.rs index cf7918b..d15bb3a 100644 --- a/src/util/crc.rs +++ b/src/util/crc.rs @@ -260,6 +260,13 @@ impl AsCrc for &[T] { } } +impl AsCrc for [u8] { + #[inline] + fn as_crc_extend(&self, crc: u32) -> u32 { + CRC::extend(crc, self) + } +} + impl AsCrc for &str { #[inline] fn as_crc_extend(&self, crc: u32) -> u32 { @@ -288,6 +295,22 @@ impl AsCrc for String { } } +pub trait ToMask { + fn to_mask(self) -> u32; + fn unmask(self) -> u32; +} + +impl ToMask for u32 { + #[inline] + fn to_mask(self) -> u32 { + CRC::mask(self) + } + #[inline] + fn unmask(self) -> u32 { + CRC::unmask(self) + } +} + pub struct CRC {} /// step1, Process one byte at a time. @@ -355,7 +378,8 @@ impl CRC { let mut l = init_crc ^ K_CRC32_XOR; // 4 byte align offset - let x = ptr_align_by4_offset(data.as_ptr()); + let mut x = ptr_align_by4_offset(data.as_ptr()); + x = if n < x { n } else { x }; // println!("x: {}, l: {:x}, n: {}", x, l, n); while s < x { step1!(data, s, l); @@ -430,7 +454,7 @@ impl CRC { #[inline] pub fn mask(crc: u32) -> u32 { // Rotate right by 15 bits and add a constant. - ((crc >> 15) | (crc << 17)) + K_MASK_DELTA + ((crc >> 15) | (crc << 17)).wrapping_add(K_MASK_DELTA) } /// 将CRC掩码转为CRC码 diff --git a/src/util/crc_test.rs b/src/util/crc_test.rs index 8d34ff9..c67c0db 100644 --- a/src/util/crc_test.rs +++ b/src/util/crc_test.rs @@ -1,4 +1,4 @@ -use crate::util::crc::{AsCrc, CRC}; +use crate::util::crc::{AsCrc, CRC, ToMask}; use crate::util::slice::Slice; #[test] @@ -55,4 +55,15 @@ fn test_mask() { assert_ne!(crc, CRC::mask(CRC::mask(crc))); assert_eq!(crc, CRC::unmask(CRC::mask(crc))); assert_eq!(crc, CRC::unmask(CRC::unmask(CRC::mask(CRC::mask(crc))))); +} + +#[test] +fn test_mask2() { + let a0 = [1]; + let a1 = [0, 97, 98, 99, 100]; + let a2 = [1, 97, 98, 99, 100]; + let crc0 = a1[1..].as_crc_extend(a0.as_crc()).to_mask(); + let crc1 = a2.as_crc().to_mask(); + println!("crc0: {}, crc1: {}", crc0, crc1); + assert_eq!(crc0, crc1); } \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs index 3d2299f..cb17c90 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,7 +1,9 @@ -use crate::util::status::LevelError; use std::result; + pub use arena::Arena; +use crate::util::status::Status; + /// 常量定义 pub mod r#const; @@ -16,7 +18,7 @@ pub mod status; mod status_test; pub mod comparator; mod comparator_test; -mod crc; +pub mod crc; mod crc_test; pub mod bloom_filter; mod bloom_filter_test; @@ -24,7 +26,7 @@ pub mod filter_policy; mod filter_policy_test; /// 定义别名 -pub type ResultT = result::Result; +pub type Result = result::Result; pub mod histogram; mod histogram_test; diff --git a/src/util/slice.rs b/src/util/slice.rs index 7c0bbb3..26ea8b1 100644 --- a/src/util/slice.rs +++ b/src/util/slice.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::ops::Deref; +#[derive(Debug)] pub struct Slice { data: Vec, } @@ -31,6 +32,14 @@ impl Slice { data: buf.to_owned() } } + + #[inline] + pub fn from_vec(data: Vec) -> Self { + Self { + data + } + } + /// 获取 slice 长度 #[inline] pub fn size(&self) -> usize { @@ -43,6 +52,11 @@ impl Slice { self.data.is_empty() } + #[inline] + pub fn as_sub_ref(&self, start: usize, length: usize) -> &[u8] { + &(**self)[start..(start+length)] + } + /// 移除头部 n 个元素 pub fn remove_prefix(&self, n: usize) -> Slice { assert!(self.size() >= n); diff --git a/src/util/status.rs b/src/util/status.rs index 9bb6662..8f5365d 100644 --- a/src/util/status.rs +++ b/src/util/status.rs @@ -1,12 +1,13 @@ -use std::fmt::{Display, Error, Formatter}; +use std::fmt::{Display, Formatter}; +use std::io; use crate::util::r#const::COLON_WHITE_SPACE; -use crate::util::ResultT; use crate::util::slice::Slice; -use crate::util::status::LevelError::{KCorruption, KIOError, KInvalidArgument, KNotSupported, KNotFound, KOk}; +use crate::util::status::LevelError::{KCorruption, KIOError, KInvalidArgument, KNotSupported, KNotFound, KOk, KBadRecord}; /// db 中的返回状态,将错误号和错误信息封装成Status类,统一进行处理。 /// 在 leveldb的实现里, 为了节省空间Status将返回码(code), 错误信息message及长度打包存储于一个字符串数组中, 来存储错误信息。 /// 在该项目中, 使用LevelError 和 Slice 存储错误信息 +#[derive(Debug)] pub struct Status { err: LevelError, msg: Slice @@ -125,18 +126,17 @@ impl Status { /// ``` #[inline] pub fn to_string(self) -> String { - let err = &self.err; - - let msg_type = match &err { + let msg_type = match self.err { KOk => "OK", KNotFound => "NotFound: ", KCorruption => "Corruption: ", KNotSupported => "Not implemented: ", KInvalidArgument => "Invalid argument: ", - KIOError => "IO error: " + KIOError => "IO error: ", + KBadRecord=> "wal bad record", }; - if err.is_ok() { + if self.err.is_ok() { return String::from(msg_type); } @@ -171,6 +171,7 @@ impl Status { // } /// Status 的状态 +#[derive(Debug)] pub enum LevelError { KOk, KNotFound, @@ -178,6 +179,7 @@ pub enum LevelError { KNotSupported, KInvalidArgument, KIOError, + KBadRecord, } impl LevelError { @@ -235,7 +237,7 @@ impl LevelError { Status{ err: KNotFound, - msg: msg + msg } } @@ -258,7 +260,7 @@ impl LevelError { Status{ err: KCorruption, - msg: msg + msg } } @@ -267,7 +269,7 @@ impl LevelError { Status{ err: KNotSupported, - msg: msg + msg } } @@ -276,7 +278,7 @@ impl LevelError { Status{ err: KInvalidArgument, - msg: msg + msg } } @@ -300,7 +302,7 @@ impl LevelError { Status{ err: KIOError, - msg: msg + msg } } } @@ -336,12 +338,19 @@ impl TryFrom for LevelError { 3 => Ok(KNotSupported), 4 => Ok(KInvalidArgument), 5 => Ok(KIOError), + 6 => Ok(KBadRecord), // all other numbers _ => Err(String::from(format!("Unknown code: {}", value))) } } } +impl From for Status { + fn from(e: io::Error) -> Self { + LevelError::io_error(e.to_string().into(), "".into()) + } +} + impl Display for LevelError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut print = String::new(); @@ -352,7 +361,8 @@ impl Display for LevelError { KCorruption => "Corruption: ", KNotSupported => "Not implemented: ", KInvalidArgument => "Invalid argument: ", - KIOError => "IO error: " + KIOError => "IO error: ", + KBadRecord => "wal bad record: ", }; print.push_str(msg_type); diff --git a/src/util/status_test.rs b/src/util/status_test.rs index eadccd3..2ac1030 100644 --- a/src/util/status_test.rs +++ b/src/util/status_test.rs @@ -1,6 +1,5 @@ mod test { - use std::borrow::Cow; use crate::util::r#const::COLON_WHITE_SPACE; use crate::util::slice::Slice; use crate::util::status::{LevelError, Status}; -- Gitee From 1ed89a3eecdd2d9d1a5266fb9eaf6d604821af2a Mon Sep 17 00:00:00 2001 From: wangboo <5417808+wangboa@user.noreply.gitee.com> Date: Wed, 4 Jan 2023 20:56:32 +0800 Subject: [PATCH 2/2] modify readme file --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 828a355..07c70b6 100644 --- a/README.md +++ b/README.md @@ -65,8 +65,8 @@ LSM tree 是许多 KV型或日志型数据库所依赖的核心实现,例如Bi | skiplist | 未认领 | | | MemTableIterator | 未认领 | | | MemTable | 未认领 | | -| LogReader | wangboo | | -| LogWriter | wangboo | | +| LogReader | wangboo | 90% | +| LogWriter | wangboo | 90% | | TableCache | 未认领 | | | FileMetaData | 未认领 | | | VersionEdit | 未认领 | | -- Gitee