From c67f383c85c49a40a195706c2632036722bc261a Mon Sep 17 00:00:00 2001 From: xuxiaozhou1 Date: Wed, 23 Aug 2023 17:10:57 +0800 Subject: [PATCH] fix: lmdb memory --- core/libcore/src/error.rs | 6 -- core/libcore/src/rel/api.rs | 106 +++----------------------------- core/libcore/src/rel/base.rs | 90 +++++---------------------- core/libcore/src/rel/history.rs | 26 +------- core/libcore/src/rel/mod.rs | 6 +- 5 files changed, 28 insertions(+), 206 deletions(-) diff --git a/core/libcore/src/error.rs b/core/libcore/src/error.rs index 197d9ccb..a59c313c 100755 --- a/core/libcore/src/error.rs +++ b/core/libcore/src/error.rs @@ -80,11 +80,6 @@ pub enum Error { source: nix::Error, }, - #[snafu(display("HeedError(libcore)"))] - Heed { - source: heed::Error, - }, - #[snafu(display("InvalidData(libcore)"))] InvalidData, @@ -185,7 +180,6 @@ impl From for nix::Error { Error::Util { source: _ } => nix::Error::EINVAL, Error::Io { source: _ } => nix::Error::EIO, Error::Nix { source } => source, - Error::Heed { source: _ } => nix::Error::EIO, Error::InvalidData => nix::Error::EINVAL, Error::NotFound { what: _ } => nix::Error::ENOENT, Error::Other { msg: _ } => nix::Error::EIO, diff --git a/core/libcore/src/rel/api.rs b/core/libcore/src/rel/api.rs index ec996bed..f50cfcad 100755 --- a/core/libcore/src/rel/api.rs +++ b/core/libcore/src/rel/api.rs @@ -14,16 +14,12 @@ use super::debug::{self, ReliDebug}; use super::{ base::{RELI_DATA_FILE, RELI_DIR, RELI_INTERNAL_MAX_DBS, RELI_LOCK_FILE}, - enable::ReliEnable, history::ReliHistory, - last::ReliLast, - pending::ReliPending, station::ReliStation, ReDbTable, ReStation, ReStationKind, }; use crate::{error::*, rel::base}; use basic::{do_entry_log, do_entry_or_return_io_error}; -use heed::{CompactionOption, Env, EnvOpenOptions}; use nix::sys::stat::{self, Mode}; use std::fmt; use std::fs::{self, File}; @@ -72,19 +68,15 @@ pub struct Reliability { debug: ReliDebug, // environment - env: Rc, // directory b_exist: bool, hdir: String, // home-directory // control data - enable: ReliEnable, // output data - last: ReliLast, history: ReliHistory, - pending: ReliPending, // input & recover station: ReliStation, @@ -93,11 +85,7 @@ pub struct Reliability { impl fmt::Debug for Reliability { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Reliability") - .field("env.path", &self.env.path()) - .field("enable", &self.enable) - .field("last", &self.last) .field("history", &self.history) - .field("pending", &self.pending) .field("station", &self.station) .finish() } @@ -119,19 +107,14 @@ impl Reliability { let hpath = hpath_path_get(&hdir); let b_exist = bflag_path_get(hpath.clone()).exists(); let path = hpath.join(subdir_cur_get(b_exist)); - let e = Rc::new(open_env(path.clone(), conf.map_size, conf.max_dbs).expect("open env")); log::info!("open with path {:?} successfully.", path); let reli = Reliability { #[cfg(debug)] debug: ReliDebug::new(&hdir), - env: Rc::clone(&e), b_exist, hdir, - enable: ReliEnable::new(&e), - last: ReliLast::new(&e), - history: ReliHistory::new(&e), - pending: ReliPending::new(&e), + history: ReliHistory::new(), station: ReliStation::new(), }; reli.debug_enable(); @@ -140,39 +123,32 @@ impl Reliability { /// set the enable flag pub fn set_enable(&self, enable: bool) { - self.enable.set_enable(enable); } /// set the last unit pub fn set_last_unit(&self, unit_id: &str) { - self.last.set_unit(unit_id); } /// clear the last unit pub fn clear_last_unit(&self) { self.history.commit(); - self.last.clear_unit(); } /// set the last frame pub fn set_last_frame(&self, f1: u32, f2: Option, f3: Option) { - self.last.set_frame(f1, f2, f3); } /// set the last frame with just one parameter pub fn set_last_frame1(&self, f1: u32) { - self.last.set_frame(f1, None, None); } /// set the last frame with two parameters pub fn set_last_frame2(&self, f1: u32, f2: u32) { - self.last.set_frame(f1, Some(f2), None); } /// clear the last frame pub fn clear_last_frame(&self) { self.history.commit(); - self.last.clear_frame(); } /// register history database @@ -182,12 +158,12 @@ impl Reliability { /// set the fd's 'cloexec' flag and record it pub fn fd_cloexec(&self, fd: i32, cloexec: bool) -> Result<()> { - self.pending.fd_cloexec(fd, cloexec) + Ok(()) } /// take the fd away pub fn fd_take(&self, fd: i32) -> i32 { - self.pending.fd_take(fd) + fd } /// register a station @@ -199,7 +175,6 @@ impl Reliability { /// if reload is true, only map result class parameters. pub fn recover(&self, reload: bool) { // ignore last's input - self.last.ignore_set(true); self.history.import(); self.input_rebuild(); @@ -208,11 +183,8 @@ impl Reliability { self.make_consistent(reload); // restore last's ignore - self.last.ignore_set(false); // clear last - self.last.clear_unit(); - self.last.clear_frame(); } /// compact the database @@ -225,69 +197,30 @@ impl Reliability { } fn compact_body(&self) -> Result<()> { - // a -> b or b -> a - // prepare next - let hpath = hpath_path_get(&self.hdir); - let next_path = hpath.join(subdir_next_get(self.b_exist)); - let next_file = next_path.join(RELI_DATA_FILE); - - // clear next: delete and re-create the whole directory - do_entry_or_return_io_error!(fs::remove_dir_all, next_path, "remove"); - do_entry_or_return_io_error!(fs::create_dir_all, next_path, "create"); - - // copy to next - self.env - .copy_to_path(next_file.clone(), CompactionOption::Disabled) - .context(HeedSnafu)?; - log::info!("compact to file {:?} successfully.", next_file); - - // remark the next flag at last: the another one - let bflag = bflag_path_get(hpath.clone()); - if self.b_exist { - do_entry_or_return_io_error!(fs::remove_file, bflag, "remove"); - } else { - do_entry_or_return_io_error!(File::create, bflag, "create"); - } - - // try to clear previous: it would be done in the next re-exec, but we try to delete it as soon as possible. - let cur_path = hpath.join(subdir_cur_get(self.b_exist)); - let cur_data = cur_path.join(RELI_DATA_FILE); - let cur_lock = cur_path.join(RELI_LOCK_FILE); - do_entry_log!(fs::remove_file, cur_data, "remove"); - do_entry_log!(fs::remove_file, cur_lock, "remove"); - Ok(()) } /// get the enable flag pub fn enable(&self) -> bool { - self.enable.enable() - } - - /// get env - pub(super) fn env(&self) -> &Env { - &self.env + false } /// get the last unit pub fn last_unit(&self) -> Option { - self.last.unit() + None } /// get the last frame pub fn last_frame(&self) -> Option<(u32, Option, Option)> { - self.last.frame() + None } /// clear all data pub fn data_clear(&self) { // data-only /* control */ - self.enable.data_clear(); /* output */ - self.last.data_clear(); self.history.data_clear(); - self.pending.data_clear(); } /// [repeating protection] clear all registers @@ -298,12 +231,7 @@ impl Reliability { /// get the ignore flag of last data pub fn last_ignore(&self) -> bool { - self.last.ignore() - } - - /// get the switch flag of history data - pub fn history_switch(&self) -> Option { - self.history.switch() + true } /// do the debug action: enable the recover process @@ -377,7 +305,6 @@ impl Reliability { } // make consistent and commit - self.pending.make_consistent(); self.station.make_consistent(lframe, lunit); self.history.commit(); @@ -463,25 +390,6 @@ fn reli_subdir_prepare(hdir: &str) -> Result<()> { Ok(()) } -fn open_env(path: PathBuf, map_size: Option, max_dbs: Option) -> heed::Result { - let mut eoo = EnvOpenOptions::new(); - - // size - if let Some(size) = map_size { - eoo.map_size(size); - } - - // dbs - let mut max = RELI_INTERNAL_MAX_DBS; - if let Some(m) = max_dbs { - max += m; - } - eoo.max_dbs(max); - - // open - eoo.open(path) -} - fn subdir_next_get(b_exist: bool) -> String { if b_exist { // b->a diff --git a/core/libcore/src/rel/base.rs b/core/libcore/src/rel/base.rs index 1dce0634..811b95e6 100755 --- a/core/libcore/src/rel/base.rs +++ b/core/libcore/src/rel/base.rs @@ -12,8 +12,6 @@ use super::Reliability; use crate::error::*; -use heed::types::SerdeBincode; -use heed::{Database, Env, RoTxn, RwTxn}; use nix::sys::stat::{self, Mode}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -42,8 +40,6 @@ pub struct ReDb { /* cache */ cache: RefCell>, // the copy of db - add: RefCell>, - del: RefCell>, /* buffer */ buffer: RefCell>, // daemon-reload or daemon-reexec will temporarily store the data here first, and finally refreshes it to db. @@ -89,8 +85,6 @@ where reli: Rc::clone(relir), switch: RefCell::new(None), cache: RefCell::new(HashMap::new()), - add: RefCell::new(HashMap::new()), - del: RefCell::new(HashSet::new()), buffer: RefCell::new(HashMap::new()), name: String::from(db_name), } @@ -98,11 +92,6 @@ where /// clear all data pub fn do_clear(&self, wtxn: &mut ReDbRwTxn) { - if let Ok(db) = self.open_db(wtxn) { - db.clear(&mut wtxn.0).expect("history.clear"); - } - self.add.borrow_mut().clear(); - self.del.borrow_mut().clear(); // Do not clear the cache and buffer, because their data are transient. } @@ -126,16 +115,15 @@ where } Some(false) => { // remove "del" + insert "add" - self.del.borrow_mut().remove(&k); - self.add.borrow_mut().insert(k.clone(), v.clone()); // update cache self.cache.borrow_mut().insert(k, v); } None => { // remove "del" + insert "add" - self.del.borrow_mut().remove(&k); - self.add.borrow_mut().insert(k, v); + + // update cache + self.cache.borrow_mut().insert(k, v); } } } @@ -153,16 +141,15 @@ where } Some(false) => { // remove "add" + insert "del" - self.add.borrow_mut().remove(k); - self.del.borrow_mut().insert(k.clone()); // update cache self.cache.borrow_mut().remove(k); } None => { // remove "add" + insert "del" - self.add.borrow_mut().remove(k); - self.del.borrow_mut().insert(k.clone()); + + // update cache + self.cache.borrow_mut().remove(k); } } } @@ -201,19 +188,10 @@ where /// export changed data from cache to database pub fn cache_2_db(&self, wtxn: &mut ReDbRwTxn) { - let db = self.open_db(wtxn).unwrap(); // "add" -> db.put + clear "add" - for (k, v) in self.add.borrow().iter() { - db.put(&mut wtxn.0, k, v).expect("history.put"); - } - self.add.borrow_mut().clear(); // "del" -> db.delete + clear "del" - for k in self.del.borrow().iter() { - db.delete(&mut wtxn.0, k).expect("history.delete"); - } - self.del.borrow_mut().clear(); } /// flush internal data to database @@ -223,14 +201,12 @@ where self.do_clear(wtxn); // "buffer" -> db.put + clear "buffer" - let db = self.open_db(wtxn).unwrap(); + self.cache.borrow_mut().clear(); for (k, v) in self.buffer.borrow().iter() { - db.put(&mut wtxn.0, k, v).expect("history.put"); + self.cache.borrow_mut().insert(k.clone(), v.clone()); } self.buffer.borrow_mut().clear(); } else { - // clear "cache" only, which is the same with db - self.cache.borrow_mut().clear(); } } @@ -241,40 +217,6 @@ where V: DeserializeOwned, { // clear "add" + "del" + "cache" - self.add.borrow_mut().clear(); - self.del.borrow_mut().clear(); - self.cache.borrow_mut().clear(); - - // db(open only) -> cache - if let Some(db) = self - .reli - .env() - .open_database::, SerdeBincode>(Some(&self.name)) - .unwrap_or(None) - { - let rtxn = ReDbRoTxn::new(self.reli.env()).expect("db_2_cache.ro_txn"); - let iter = db.iter(&rtxn.0).unwrap(); - for entry in iter { - let (k, v) = entry.unwrap(); - self.cache.borrow_mut().insert(k, v); - } - } - } - - fn open_db(&self, wtxn: &mut ReDbRwTxn) -> Result, SerdeBincode>> { - let database = self - .reli - .env() - .open_database(Some(&self.name)) - .context(HeedSnafu)?; - if let Some(db) = database { - Ok(db) - } else { - self.reli - .env() - .create_database_with_txn(Some(&self.name), &mut wtxn.0) - .context(HeedSnafu) - } } fn switch(&self) -> Option { @@ -283,22 +225,22 @@ where } /// reliability writeable transaction -pub struct ReDbRwTxn<'e, 'p>(pub RwTxn<'e, 'p>); +pub struct ReDbRwTxn(); -impl<'e, 'p> ReDbRwTxn<'e, 'p> { +impl ReDbRwTxn { /// - pub fn new(env: &'e Env) -> heed::Result { - env.write_txn().map(ReDbRwTxn) + pub fn new() -> ReDbRwTxn { + ReDbRwTxn {} } } /// reliability read-only transaction -pub struct ReDbRoTxn<'e>(pub RoTxn<'e>); +pub struct ReDbRoTxn(); -impl<'e> ReDbRoTxn<'e> { +impl ReDbRoTxn { /// - pub fn new(env: &'e Env) -> heed::Result { - env.read_txn().map(ReDbRoTxn) + pub fn new() -> ReDbRoTxn { + ReDbRoTxn {} } } diff --git a/core/libcore/src/rel/history.rs b/core/libcore/src/rel/history.rs index be896179..6ebaaee6 100755 --- a/core/libcore/src/rel/history.rs +++ b/core/libcore/src/rel/history.rs @@ -11,7 +11,6 @@ // See the Mulan PSL v2 for more details. use super::base::{ReDbRwTxn, ReDbTable}; -use heed::Env; use std::cell::RefCell; use std::collections::HashMap; use std::fmt; @@ -19,7 +18,6 @@ use std::rc::Rc; pub struct ReliHistory { // associated objects - env: Rc, // control switch: RefCell>, @@ -31,27 +29,20 @@ pub struct ReliHistory { impl fmt::Debug for ReliHistory { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ReliHistory") - .field("env.path", &self.env.path()) .field("env.dbs.len", &self.dbs.borrow().len()) .finish() } } impl ReliHistory { - pub fn new(envr: &Rc) -> ReliHistory { + pub fn new() -> ReliHistory { ReliHistory { switch: RefCell::new(None), - env: Rc::clone(envr), dbs: RefCell::new(HashMap::new()), } } pub fn data_clear(&self) { - let mut db_wtxn = ReDbRwTxn::new(&self.env).expect("history.write_txn"); - for (_, db) in self.dbs.borrow().iter() { - db.clear(&mut db_wtxn); - } - db_wtxn.0.commit().expect("history.commit"); } pub fn db_register(&self, name: &str, db: Rc) { @@ -59,29 +50,16 @@ impl ReliHistory { } pub fn commit(&self) { - // create transaction - let mut db_wtxn = ReDbRwTxn::new(&self.env).expect("history.write_txn"); - - // export to db - for (_, db) in self.dbs.borrow().iter() { - db.export(&mut db_wtxn); - } - - // commit - db_wtxn.0.commit().expect("history.commit"); } pub(super) fn flush(&self, switch: bool) { // create transaction - let mut db_wtxn = ReDbRwTxn::new(&self.env).expect("history.write_txn"); + let mut db_wtxn = ReDbRwTxn::new(); // flush to db for (_, db) in self.dbs.borrow().iter() { db.flush(&mut db_wtxn, switch); } - - // commit - db_wtxn.0.commit().expect("history.commit"); } pub fn import(&self) { diff --git a/core/libcore/src/rel/mod.rs b/core/libcore/src/rel/mod.rs index 03f4c9a7..88f64626 100755 --- a/core/libcore/src/rel/mod.rs +++ b/core/libcore/src/rel/mod.rs @@ -22,10 +22,10 @@ mod api; mod base; #[cfg(debug)] mod debug; -mod enable; +//mod enable; mod history; -mod last; -mod pending; +//mod last; +//mod pending; mod station; /// -- Gitee