diff --git a/pack-rs/cache.rs b/pack-rs/cache.rs new file mode 100644 index 0000000000000000000000000000000000000000..67ca42347dbc6c9e4306d31fa3978c2932bcaceb --- /dev/null +++ b/pack-rs/cache.rs @@ -0,0 +1,262 @@ +//! +//! +//! +//! +//! +//! + +use std::path::Path; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; +use std::{fs, io}; + +use crate::internal::pack::cache_object::{ArcWrapper, CacheObject, MemSizeRecorder}; +use crate::time_it; +use dashmap::{DashMap, DashSet}; +use lru_mem::LruCache; +use threadpool::ThreadPool; +use venus::hash::SHA1; + +use super::cache_object::FileLoadStore; + + +pub trait _Cache { + fn new(mem_size: Option, tmp_path: PathBuf, thread_num: usize) -> Self + where + Self: Sized; + fn get_hash(&self, offset: usize) -> Option; + fn insert(&self, offset: usize, hash: SHA1, obj: CacheObject) -> Arc; + fn get_by_offset(&self, offset: usize) -> Option>; + fn get_by_hash(&self, h: SHA1) -> Option>; + fn total_inserted(&self) -> usize; + fn memory_used(&self) -> usize; + fn clear(&self); +} + +pub struct Caches { + map_offset: DashMap, // offset to hash + hash_set: DashSet, // item in the cache + // dropping large lru cache will take a long time on Windows without multi-thread IO + // because "multi-thread IO" clone Arc, so it won't be dropped in the main thread, + // and `CacheObjects` will be killed by OS after Process ends abnormally + // Solution: use `mimalloc` + lru_cache: Mutex>>, // *lru_cache require the key to implement lru::MemSize trait, so didn't use SHA1 as the key* + mem_size: Option, + tmp_path: PathBuf, + pool: Arc, + complete_signal: Arc, +} + +impl Caches { + /// only get object from memory, not from tmp file + fn try_get(&self, hash: SHA1) -> Option> { + let mut map = self.lru_cache.lock().unwrap(); + map.get(&hash.to_plain_str()).map(|x| x.data.clone()) + } + + /// !IMPORTANT: because of the process of pack, the file must be written / be writing before, so it won't be dead lock + /// fall back to temp to get item. **invoker should ensure the hash is in the cache, or it will block forever** + fn get_fallback(&self, hash: SHA1) -> io::Result> { + // read from tmp file + let obj = { + loop { + match self.read_from_temp(hash) { + Ok(x) => break x, + Err(e) if e.kind() == io::ErrorKind::NotFound => { + sleep(std::time::Duration::from_millis(10)); //TODO 有没有更好办法 + continue; + } + Err(e) => return Err(e), // other error + } + } + }; + + let mut map = self.lru_cache.lock().unwrap(); + let obj = Arc::new(obj); + let mut x = ArcWrapper::new( + obj.clone(), + self.complete_signal.clone(), + Some(self.pool.clone()), + ); + x.set_store_path(Caches::generate_temp_path(&self.tmp_path, hash)); + let _ = map.insert(hash.to_plain_str(), x); // handle the error + Ok(obj) + } + + /// generate the temp file path, hex string of the hash + fn generate_temp_path(tmp_path: &Path, hash: SHA1) -> PathBuf { + let mut path = tmp_path.to_path_buf(); + path.push(hash.to_plain_str()); + path + } + + fn read_from_temp(&self, hash: SHA1) -> io::Result { + let path = Self::generate_temp_path(&self.tmp_path, hash); + let obj = CacheObject::f_load(&path)?; + // Deserializing will also create an object but without Construction outside and `::new()` + // So if you want to do sth. while Constructing, impl Deserialize trait yourself + obj.record_mem_size(); + Ok(obj) + } + + pub fn queued_tasks(&self) -> usize { + self.pool.queued_count() + } + + /// memory used by the index (exclude lru_cache which is contained in [CacheObject::get_mem_size()]) + pub fn memory_used_index(&self) -> usize { + self.map_offset.capacity() * (std::mem::size_of::() + std::mem::size_of::()) + + self.hash_set.capacity() * (std::mem::size_of::()) + } +} + +impl _Cache for Caches { + /// @param size: the size of the memory lru cache. **None means no limit** + /// @param tmp_path: the path to store the cache object in the tmp file + fn new(mem_size: Option, tmp_path: PathBuf, thread_num: usize) -> Self + where + Self: Sized, + { + fs::create_dir_all(&tmp_path).unwrap(); + + Caches { + map_offset: DashMap::new(), + hash_set: DashSet::new(), + lru_cache: Mutex::new(LruCache::new(mem_size.unwrap_or(usize::MAX))), + mem_size, + tmp_path, + pool: Arc::new(ThreadPool::new(thread_num)), + complete_signal: Arc::new(AtomicBool::new(false)), + } + } + + fn get_hash(&self, offset: usize) -> Option { + self.map_offset.get(&offset).map(|x| *x) + } + + fn insert(&self, offset: usize, hash: SHA1, obj: CacheObject) -> Arc { + let obj_arc = Arc::new(obj); + { + // ? whether insert to cache directly or only write to tmp file + let mut map = self.lru_cache.lock().unwrap(); + let mut a_obj = ArcWrapper::new( + obj_arc.clone(), + self.complete_signal.clone(), + Some(self.pool.clone()), + ); + a_obj.set_store_path(Caches::generate_temp_path(&self.tmp_path, hash)); + let _ = map.insert(hash.to_plain_str(), a_obj); + } + //order maters as for reading in 'get_by_offset()' + self.hash_set.insert(hash); + self.map_offset.insert(offset, hash); + + obj_arc + } + + fn get_by_offset(&self, offset: usize) -> Option> { + match self.map_offset.get(&offset) { + Some(x) => self.get_by_hash(*x), + None => None, + } + } + + fn get_by_hash(&self, hash: SHA1) -> Option> { + // check if the hash is in the cache( lru or tmp file) + if self.hash_set.contains(&hash) { + match self.try_get(hash) { + Some(x) => Some(x), + None => { + if self.mem_size.is_none() { + panic!("should not be here when mem_size is not set") + } + match self.get_fallback(hash) { + Ok(x) => Some(x), + Err(_) => None, + } + } + } + } else { + None + } + } + + fn total_inserted(&self) -> usize { + self.hash_set.len() + } + fn memory_used(&self) -> usize { + self.lru_cache.lock().unwrap().current_size() + + self.map_offset.capacity() * (std::mem::size_of::() + std::mem::size_of::()) + + self.hash_set.capacity() * (std::mem::size_of::()) + } + fn clear(&self) { + time_it!("Caches clear", { + self.complete_signal.store(true, Ordering::SeqCst); + self.pool.join(); + self.lru_cache.lock().unwrap().clear(); + self.hash_set.clear(); + self.map_offset.clear(); + }); + + time_it!("Remove tmp dir", { + fs::remove_dir_all(&self.tmp_path).unwrap(); //very slow + }); + + assert_eq!(self.pool.queued_count(), 0); + assert_eq!(self.pool.active_count(), 0); + assert_eq!(self.lru_cache.lock().unwrap().len(), 0); + } +} + +#[cfg(test)] +mod test { + use std::env; + + use super::*; + use venus::hash::SHA1; + + #[test] + fn test_cach_single_thread() { + let source = PathBuf::from(env::current_dir().unwrap().parent().unwrap()); + let cache = Caches::new(Some(2048), source.clone().join("tests/.cache_tmp"), 1); + let a = CacheObject { + data_decompress: vec![0; 1024], + hash: SHA1::new(&String::from("a").into_bytes()), + ..Default::default() + }; + let b = CacheObject { + data_decompress: vec![0; 1636], + hash: SHA1::new(&String::from("b").into_bytes()), + ..Default::default() + }; + // insert a + cache.insert(a.offset, a.hash, a.clone()); + assert!(cache.hash_set.contains(&a.hash)); + assert!(cache.try_get(a.hash).is_some()); + + // insert b and make a invalidate + cache.insert(b.offset, b.hash, b.clone()); + assert!(cache.hash_set.contains(&b.hash)); + assert!(cache.try_get(b.hash).is_some()); + assert!(cache.try_get(a.hash).is_none()); + + // get a and make b invalidate + let _ = cache.get_by_hash(a.hash); + assert!(cache.try_get(a.hash).is_some()); + assert!(cache.try_get(b.hash).is_none()); + + // insert too large c, a will still be in the cache + let c = CacheObject { + data_decompress: vec![0; 2049], + hash: SHA1::new(&String::from("c").into_bytes()), + ..Default::default() + }; + cache.insert(c.offset, c.hash, c.clone()); + assert!(cache.try_get(a.hash).is_some()); + assert!(cache.try_get(b.hash).is_none()); + assert!(cache.try_get(c.hash).is_none()); + assert!(cache.get_by_hash(c.hash).is_some()); + } +} diff --git a/pack-rs/cache_object.rs b/pack-rs/cache_object.rs new file mode 100644 index 0000000000000000000000000000000000000000..eae044a2c75d6d9580f413f6c3640262fb8f5ba2 --- /dev/null +++ b/pack-rs/cache_object.rs @@ -0,0 +1,465 @@ +use std::fs::OpenOptions; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::{fs, io}; +use std::{ops::Deref, sync::Arc}; + +use crate::internal::pack::utils; +use lru_mem::{HeapSize, MemSize}; +use serde::{Deserialize, Serialize}; +use threadpool::ThreadPool; +use venus::{hash::SHA1, internal::object::types::ObjectType}; + +/// record heap-size of all CacheObjects, used for memory limit. +static CACHE_OBJS_MEM_SIZE: AtomicUsize = AtomicUsize::new(0); + +/// file load&store trait +pub trait FileLoadStore: Serialize + for<'a> Deserialize<'a> { + fn f_load(path: &Path) -> Result; + fn f_save(&self, path: &Path) -> Result<(), io::Error>; +} +// trait alias, so that impl FileLoadStore == impl Serialize + Deserialize +impl Deserialize<'a>> FileLoadStore for T { + fn f_load(path: &Path) -> Result { + let data = fs::read(path)?; + let obj: T = + bincode::deserialize(&data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + Ok(obj) + } + fn f_save(&self, path: &Path) -> Result<(), io::Error> { + if path.exists() { + return Ok(()); + } + let data = bincode::serialize(&self).unwrap(); + let path = path.with_extension("temp"); + { + let mut file = OpenOptions::new() + .write(true) + .create_new(true) + .open(path.clone())?; + file.write_all(&data)?; + } + let final_path = path.with_extension(""); + fs::rename(&path, final_path.clone())?; + Ok(()) + } +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CacheObject { + pub base_offset: usize, + pub base_ref: SHA1, + pub obj_type: ObjectType, + pub data_decompress: Vec, + pub offset: usize, + pub hash: SHA1, +} +// For Convenience +impl Default for CacheObject { + // It will be called in "struct update syntax": `..Default::default()` + // So, mem-record should happen here! + fn default() -> Self { + let obj = CacheObject { + base_offset: 0, + base_ref: SHA1::default(), + data_decompress: Vec::new(), + obj_type: ObjectType::Blob, + offset: 0, + hash: SHA1::default(), + }; + obj.record_mem_size(); + obj + } +} + +// ! used by lru_mem to calculate the size of the object, limit the memory usage. +// ! the implementation of HeapSize is not accurate, only calculate the size of the data_decompress +// Note that: mem_size == value_size + heap_size, and we only need to impl HeapSize because value_size is known +impl HeapSize for CacheObject { + fn heap_size(&self) -> usize { + self.data_decompress.heap_size() + } +} + +impl Drop for CacheObject { + // Check: the heap-size subtracted when Drop is equal to the heap-size recorded + // (cannot change the heap-size during life cycle) + fn drop(&mut self) { + // (&*self).heap_size() != self.heap_size() + CACHE_OBJS_MEM_SIZE.fetch_sub((*self).mem_size(), Ordering::SeqCst); + } +} + +/// Heap-size recorder for a class(struct) +///
You should use a static Var to record mem-size +/// and record mem-size after construction & minus it in `drop()` +///
So, variable-size fields in object should NOT be modified to keep heap-size stable. +///
Or, you can record the initial mem-size in this object +///
Or, update it (not impl) +pub trait MemSizeRecorder: MemSize { + fn record_mem_size(&self); + fn get_mem_size() -> usize; +} + +impl MemSizeRecorder for CacheObject { + /// record the mem-size of this `CacheObj` in a `static` `var` + ///
since that, DO NOT modify `CacheObj` after recording + fn record_mem_size(&self) { + CACHE_OBJS_MEM_SIZE.fetch_add(self.mem_size(), Ordering::SeqCst); + } + + fn get_mem_size() -> usize { + CACHE_OBJS_MEM_SIZE.load(Ordering::SeqCst) + } +} + +impl CacheObject { + /// Create a new CacheObject witch is not offset_delta or hash_delta + pub fn new_for_undeltified(obj_type: ObjectType, data: Vec, offset: usize) -> Self { + let hash = utils::calculate_object_hash(obj_type, &data); + CacheObject { + data_decompress: data, + obj_type, + offset, + hash, + ..Default::default() + } + } + + /// transform the CacheObject to venus::internal::pack::entry::Entry + pub fn to_entry(&self) -> venus::internal::pack::entry::Entry { + match self.obj_type { + ObjectType::Blob | ObjectType::Tree | ObjectType::Commit | ObjectType::Tag => { + venus::internal::pack::entry::Entry { + obj_type: self.obj_type, + data: self.data_decompress.clone(), + hash: self.hash, + } + } + _ => { + unreachable!("delta object should not persist!") + } + } + } +} + +/// trait alias for simple use +pub trait ArcWrapperBounds: + HeapSize + Serialize + for<'a> Deserialize<'a> + Send + Sync + 'static +{ +} +// You must impl `Alias Trait` for all the `T` satisfying Constraints +// Or, `T` will not satisfy `Alias Trait` even if it satisfies the Original traits +impl Deserialize<'a> + Send + Sync + 'static> ArcWrapperBounds + for T +{ +} + +/// !Implementing encapsulation of Arc to enable third-party Trait HeapSize implementation for the Arc type +/// !Because of use Arc in LruCache, the LruCache is not clear whether a pointer will drop the referenced +/// ! content when it is ejected from the cache, the actual memory usage is not accurate +pub struct ArcWrapper { + pub data: Arc, + complete_signal: Arc, + pool: Option>, + pub store_path: Option, // path to store when drop +} +impl ArcWrapper { + /// Create a new ArcWrapper + pub fn new(data: Arc, share_flag: Arc, pool: Option>) -> Self { + ArcWrapper { + data, + complete_signal: share_flag, + pool, + store_path: None, + } + } + pub fn set_store_path(&mut self, path: PathBuf) { + self.store_path = Some(path); + } +} + +impl HeapSize for ArcWrapper { + fn heap_size(&self) -> usize { + self.data.heap_size() + } +} + +impl Clone for ArcWrapper { + /// clone won't clone the store_path + fn clone(&self) -> Self { + ArcWrapper { + data: self.data.clone(), + complete_signal: self.complete_signal.clone(), + pool: self.pool.clone(), + store_path: None, + } + } +} + +impl Deref for ArcWrapper { + type Target = Arc; + fn deref(&self) -> &Self::Target { + &self.data + } +} +impl Drop for ArcWrapper { + // `drop` will be called in `lru_cache.insert()` when cache full & eject the LRU + // `lru_cache.insert()` is protected by Mutex + fn drop(&mut self) { + if !self.complete_signal.load(Ordering::SeqCst) { + if let Some(path) = &self.store_path { + match &self.pool { + Some(pool) => { + let data_copy = self.data.clone(); + let path_copy = path.clone(); + let complete_signal = self.complete_signal.clone(); + // block entire process, wait for IO, Control Memory + // queue size will influence the Memory usage + while pool.queued_count() > 2000 { + std::thread::yield_now(); + } + pool.execute(move || { + if !complete_signal.load(Ordering::SeqCst) { + let res = data_copy.f_save(&path_copy); + if let Err(e) = res { + println!("[f_save] {:?} error: {:?}", path_copy, e); + } + } + }); + } + None => { + let res = self.data.f_save(path); + if let Err(e) = res { + println!("[f_save] {:?} error: {:?}", path, e); + } + } + } + } + } + } +} +#[cfg(test)] +mod test { + use std::{fs, sync::Mutex}; + + use lru_mem::LruCache; + + use super::*; + #[test] + #[ignore = "only in single thread"] + // 只在单线程测试 + fn test_heap_size_record() { + let obj = CacheObject { + data_decompress: vec![0; 1024], + ..Default::default() + }; + obj.record_mem_size(); + assert_eq!(CacheObject::get_mem_size(), 1024); + drop(obj); + assert_eq!(CacheObject::get_mem_size(), 0); + } + + #[test] + fn test_cache_object_with_same_size() { + let a = CacheObject { + base_offset: 0, + base_ref: SHA1::new(&vec![0; 20]), + data_decompress: vec![0; 1024], + obj_type: ObjectType::Blob, + offset: 0, + hash: SHA1::new(&vec![0; 20]), + }; + assert!(a.heap_size() == 1024); + + // let b = ArcWrapper(Arc::new(a.clone())); + let b = ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(false)), None); + assert!(b.heap_size() == 1024); + } + #[test] + fn test_chache_object_with_lru() { + let mut cache = LruCache::new(2048); + let a = CacheObject { + base_offset: 0, + base_ref: SHA1::new(&vec![0; 20]), + data_decompress: vec![0; 1024], + obj_type: ObjectType::Blob, + offset: 0, + hash: SHA1::new(&vec![0; 20]), + }; + println!("a.heap_size() = {}", a.heap_size()); + + let b = CacheObject { + base_offset: 0, + base_ref: SHA1::new(&vec![0; 20]), + data_decompress: vec![0; (1024.0 * 1.5) as usize], + obj_type: ObjectType::Blob, + offset: 0, + hash: SHA1::new(&vec![1; 20]), + }; + { + let r = cache.insert( + a.hash.to_plain_str(), + ArcWrapper::new(Arc::new(a.clone()), Arc::new(AtomicBool::new(true)), None), + ); + assert!(r.is_ok()) + } + { + let r = cache.try_insert( + b.clone().hash.to_plain_str(), + ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None), + ); + assert!(r.is_err()); + if let Err(lru_mem::TryInsertError::WouldEjectLru { .. }) = r { + // 匹配到指定错误,不需要额外操作 + } else { + panic!("Expected WouldEjectLru error"); + } + let r = cache.insert( + b.hash.to_plain_str(), + ArcWrapper::new(Arc::new(b.clone()), Arc::new(AtomicBool::new(true)), None), + ); + assert!(r.is_ok()); + } + { + // a should be ejected + let r = cache.get(&a.hash.to_plain_str()); + assert!(r.is_none()); + } + } + + #[derive(Serialize, Deserialize)] + struct Test { + a: usize, + } + impl Drop for Test { + fn drop(&mut self) { + println!("drop Test"); + } + } + impl HeapSize for Test { + fn heap_size(&self) -> usize { + self.a + } + } + #[test] + fn test_lru_drop() { + println!("insert a"); + let cache = LruCache::new(2048); + let cache = Arc::new(Mutex::new(cache)); + { + let mut c = cache.as_ref().lock().unwrap(); + let _ = c.insert( + "a", + ArcWrapper::new( + Arc::new(Test { a: 1024 }), + Arc::new(AtomicBool::new(true)), + None, + ), + ); + } + println!("insert b, a should be ejected"); + { + let mut c = cache.as_ref().lock().unwrap(); + let _ = c.insert( + "b", + ArcWrapper::new( + Arc::new(Test { a: 1200 }), + Arc::new(AtomicBool::new(true)), + None, + ), + ); + } + let b = { + let mut c = cache.as_ref().lock().unwrap(); + c.get("b").cloned() + }; + println!("insert c, b should not be ejected"); + { + let mut c = cache.as_ref().lock().unwrap(); + let _ = c.insert( + "c", + ArcWrapper::new( + Arc::new(Test { a: 1200 }), + Arc::new(AtomicBool::new(true)), + None, + ), + ); + } + println!("user b: {}", b.as_ref().unwrap().a); + println!("test over, enject all"); + } + + #[test] + fn test_cache_object_serialize() { + let a = CacheObject { + base_offset: 0, + base_ref: SHA1::new(&vec![0; 20]), + data_decompress: vec![0; 1024], + obj_type: ObjectType::Blob, + offset: 0, + hash: SHA1::new(&vec![0; 20]), + }; + let s = bincode::serialize(&a).unwrap(); + let b: CacheObject = bincode::deserialize(&s).unwrap(); + assert!(a.base_offset == b.base_offset); + } + + #[test] + fn test_arc_wrapper_drop_store() { + let mut path = PathBuf::from(".cache_temp/test_arc_wrapper_drop_store"); + fs::create_dir_all(&path).unwrap(); + path.push("test_obj"); + let mut a = ArcWrapper::new(Arc::new(1024), Arc::new(AtomicBool::new(false)), None); + a.set_store_path(path.clone()); + drop(a); + + assert!(path.exists()); + path.pop(); + fs::remove_dir_all(path).unwrap(); + } + + #[test] + /// test warpper can't correctly store the data when lru eject it + fn test_arc_wrapper_with_lru() { + let mut cache = LruCache::new(1500); + let path = PathBuf::from(".cache_temp/test_arc_wrapper_with_lru"); + let _ = fs::remove_dir_all(&path); + fs::create_dir_all(&path).unwrap(); + let shared_flag = Arc::new(AtomicBool::new(false)); + + // insert a, a not ejected + let a_path = path.join("a"); + { + let mut a = ArcWrapper::new(Arc::new(Test { a: 1024 }), shared_flag.clone(), None); + a.set_store_path(a_path.clone()); + let b = ArcWrapper::new(Arc::new(1024), shared_flag.clone(), None); + assert!(b.store_path.is_none()); + + println!("insert a with heap size: {:?}", a.heap_size()); + let rt = cache.insert("a", a); + if let Err(e) = rt { + panic!("{}", format!("insert a failed: {:?}", e.to_string())); + } + println!("after insert a, cache used = {}", cache.current_size()); + } + assert!(!a_path.exists()); + + let b_path = path.join("b"); + // insert b, a should be ejected + { + let mut b = ArcWrapper::new(Arc::new(Test { a: 996 }), shared_flag.clone(), None); + b.set_store_path(b_path.clone()); + let rt = cache.insert("b", b); + if let Err(e) = rt { + panic!("{}", format!("insert a failed: {:?}", e.to_string())); + } + println!("after insert b, cache used = {}", cache.current_size()); + } + assert!(a_path.exists()); + assert!(!b_path.exists()); + shared_flag.store(true, Ordering::SeqCst); + fs::remove_dir_all(path).unwrap(); + // should pass even b's path not exists + } +} diff --git a/pack-rs/decode.rs b/pack-rs/decode.rs new file mode 100644 index 0000000000000000000000000000000000000000..62d505182c11f914c74a1e055d2ce0f49cc905b0 --- /dev/null +++ b/pack-rs/decode.rs @@ -0,0 +1,684 @@ +//! +//! +//! +//! +//! +//! +use std::io::{self, BufRead, Cursor, ErrorKind, Read, Seek}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc::Sender; +use std::sync::Arc; +use std::thread::{self, sleep}; +use std::time::Instant; + +use flate2::bufread::ZlibDecoder; +use threadpool::ThreadPool; + +use venus::errors::GitError; +use venus::hash::SHA1; +use venus::internal::object::types::ObjectType; +use venus::internal::pack::entry::Entry; + +use super::cache::_Cache; +use crate::internal::pack::cache::Caches; +use crate::internal::pack::cache_object::{CacheObject, MemSizeRecorder}; +use crate::internal::pack::waitlist::Waitlist; +use crate::internal::pack::wrapper::Wrapper; +use crate::internal::pack::{utils, Pack}; +use uuid::Uuid; + +impl Pack { + /// # Parameters + /// - `thread_num`: The number of threads to use for decoding and cache, `None` mean use the number of logical CPUs. + /// It can't be zero, or panic
+ /// - `mem_limit`: The maximum size of the memory cache in bytes, or None for unlimited. + /// The 80% of it will be used for [Caches]
+ /// **Not very accurate, because of memory alignment and other reasons, overuse about 15%**
+ /// - `temp_path`: The path to a directory for temporary files, default is "./.cache_temp"
+ /// For example, thread_num = 4 will use up to 8 threads (4 for decoding and 4 for cache)
+ /// + /// # !IMPORTANT: + /// Can't decode in multi-tasking, because memory limit use shared static variable but different cache, cause "deadlock". + pub fn new(thread_num: Option, mem_limit: Option, temp_path: Option) -> Self { + let mut temp_path = temp_path.unwrap_or(PathBuf::from("./.cache_temp")); + temp_path.push(Uuid::new_v4().to_string()); + let thread_num = thread_num.unwrap_or_else(num_cpus::get); + let cache_mem_size = mem_limit.map(|mem_limit| mem_limit * 4 / 5); + Pack { + number: 0, + signature: SHA1::default(), + objects: Vec::new(), + pool: Arc::new(ThreadPool::new(thread_num)), + waitlist: Arc::new(Waitlist::new()), + caches: Arc::new(Caches::new(cache_mem_size, temp_path, thread_num)), + mem_limit: mem_limit.unwrap_or(usize::MAX), + } + } + + /// Checks and reads the header of a Git pack file. + /// + /// This function reads the first 12 bytes of a pack file, which include the "PACK" magic identifier, + /// the version number, and the number of objects in the pack. It verifies that the magic identifier + /// is correct and that the version number is 2 (which is the version currently supported by Git). + /// It also collects these header bytes for later use, such as for hashing the entire pack file. + /// + /// # Parameters + /// * `pack`: A mutable reference to an object implementing the `Read` trait, + /// representing the source of the pack file data (e.g., file, memory stream). + /// + /// # Returns + /// A `Result` which is: + /// * `Ok((u32, Vec))`: On successful reading and validation of the header, returns a tuple where: + /// - The first element is the number of objects in the pack file (`u32`). + /// - The second element is a vector containing the bytes of the pack file header (`Vec`). + /// * `Err(GitError)`: On failure, returns a `GitError` with a description of the issue. + /// + /// # Errors + /// This function can return an error in the following situations: + /// * If the pack file does not start with the "PACK" magic identifier. + /// * If the pack file's version number is not 2. + /// * If there are any issues reading from the provided `pack` source. + pub fn check_header(pack: &mut (impl Read + BufRead)) -> Result<(u32, Vec), GitError> { + // A vector to store the header data for hashing later + let mut header_data = Vec::new(); + + // Read the first 4 bytes which should be "PACK" + let mut magic = [0; 4]; + // Read the magic "PACK" identifier + let result = pack.read_exact(&mut magic); + match result { + Ok(_) => { + // Store these bytes for later + header_data.extend_from_slice(&magic); + + // Check if the magic bytes match "PACK" + if magic != *b"PACK" { + // If not, return an error indicating invalid pack header + return Err(GitError::InvalidPackHeader(format!( + "{},{},{},{}", + magic[0], magic[1], magic[2], magic[3] + ))); + } + }, + Err(_e) => { + // If there is an error in reading, return a GitError + return Err(GitError::InvalidPackHeader(format!( + "{},{},{},{}", + magic[0], magic[1], magic[2], magic[3] + ))); + } + } + + // Read the next 4 bytes for the version number + let mut version_bytes = [0; 4]; + let result = pack.read_exact(&mut version_bytes); // Read the version number + match result { + Ok(_) => { + // Store these bytes + header_data.extend_from_slice(&version_bytes); + + // Convert the version bytes to an u32 integer + let version = u32::from_be_bytes(version_bytes); + if version != 2 { + // Git currently supports version 2, so error if not version 2 + return Err(GitError::InvalidPackFile(format!( + "Version Number is {}, not 2", + version + ))); + } + // If read is successful, proceed + }, + Err(_e) => { + // If there is an error in reading, return a GitError + return Err(GitError::InvalidPackHeader(format!( + "{},{},{},{}", + version_bytes[0], version_bytes[1], version_bytes[2], version_bytes[3] + ))); + } + } + + // Read the next 4 bytes for the number of objects in the pack + let mut object_num_bytes = [0; 4]; + // Read the number of objects + let result = pack.read_exact(&mut object_num_bytes); + match result { + Ok(_) => { + // Store these bytes + header_data.extend_from_slice(&object_num_bytes); + // Convert the object number bytes to an u32 integer + let object_num = u32::from_be_bytes(object_num_bytes); + // Return the number of objects and the header data for further processing + Ok((object_num, header_data)) + }, + Err(_e) => { + // If there is an error in reading, return a GitError + Err(GitError::InvalidPackHeader(format!( + "{},{},{},{}", + object_num_bytes[0], object_num_bytes[1], object_num_bytes[2], object_num_bytes[3] + ))) + } + } + } + + /// Decompresses data from a given Read and BufRead source using Zlib decompression. + /// + /// # Parameters + /// * `pack`: A source that implements both Read and BufRead traits (e.g., file, network stream). + /// * `expected_size`: The expected decompressed size of the data. + /// + /// # Returns + /// Returns a `Result` containing either: + /// * A tuple with a `Vec` of decompressed data, a `Vec` of the original compressed data, + /// and the total number of input bytes processed, + /// * Or a `GitError` in case of a mismatch in expected size or any other reading error. + /// + pub fn decompress_data(&mut self, pack: &mut (impl Read + BufRead + Send), expected_size: usize, ) -> Result<(Vec, usize), GitError> { + // Create a buffer with the expected size for the decompressed data + let mut buf = Vec::with_capacity(expected_size); + // Create a new Zlib decoder with the original data + let mut deflate = ZlibDecoder::new(pack); + + // Attempt to read data to the end of the buffer + match deflate.read_to_end(&mut buf) { + Ok(_) => { + // Check if the length of the buffer matches the expected size + if buf.len() != expected_size { + Err(GitError::InvalidPackFile(format!( + "The object size {} does not match the expected size {}", + buf.len(), + expected_size + ))) + } else { + // If everything is as expected, return the buffer, the original data, and the total number of input bytes processed + Ok((buf, deflate.total_in() as usize)) + // TODO this will likely be smaller than what the decompressor actually read from the underlying stream due to buffering. + } + }, + Err(e) => { + // If there is an error in reading, return a GitError + Err(GitError::InvalidPackFile(format!( "Decompression error: {}", e))) + } + } + } + + /// Decodes a pack object from a given Read and BufRead source and returns the original compressed data. + /// + /// # Parameters + /// * `pack`: A source that implements both Read and BufRead traits. + /// * `offset`: A mutable reference to the current offset within the pack. + /// + /// # Returns + /// Returns a `Result` containing either: + /// * A tuple of the next offset in the pack and the original compressed data as `Vec`, + /// * Or a `GitError` in case of any reading or decompression error. + /// + pub fn decode_pack_object(&mut self, pack: &mut (impl Read + BufRead + Send), offset: &mut usize) -> Result { + let init_offset = *offset; + + // Attempt to read the type and size, handle potential errors + let (type_bits, size) = match utils::read_type_and_varint_size(pack, offset) { + Ok(result) => result, + Err(e) => { + // Handle the error e.g., by logging it or converting it to GitError + // and then return from the function + return Err(GitError::InvalidPackFile(format!("Read error: {}", e))); + } + }; + + // Check if the object type is valid + let t = ObjectType::from_u8(type_bits)?; + + // util lambda: return data with result capacity after rebuilding, for Memory Control + let reserve_delta_data = |data: Vec| -> Vec { + let result_size = { // Read `result-size` of delta_obj + let mut reader = Cursor::new(&data); + let _ = utils::read_varint_le(&mut reader).unwrap().0; // base_size + utils::read_varint_le(&mut reader).unwrap().0 // size after rebuilding + }; + // capacity() == result_size, len() == data.len() + // just for accurate Memory Control (rely on `heap_size()` that based on capacity) + // Seems wasteful temporarily, but for final memory limit. + let mut data_result_cap = Vec::with_capacity(result_size as usize); + data_result_cap.extend(data); + data_result_cap + }; + + match t { + ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => { + let (data, raw_size) = self.decompress_data(pack, size)?; + *offset += raw_size; + Ok(CacheObject::new_for_undeltified(t, data, init_offset)) + }, + ObjectType::OffsetDelta => { + let (delta_offset, bytes) = utils::read_offset_encoding(pack).unwrap(); + *offset += bytes; + + let (data, raw_size) = self.decompress_data(pack, size)?; + *offset += raw_size; + + // Count the base object offset: the current offset - delta offset + let base_offset = init_offset + .checked_sub(delta_offset as usize) + .ok_or_else(|| { + GitError::InvalidObjectInfo("Invalid OffsetDelta offset".to_string()) + }) + .unwrap(); + + Ok(CacheObject { + base_offset, + data_decompress: reserve_delta_data(data), + obj_type: t, + offset: init_offset, + ..Default::default() + }) + }, + ObjectType::HashDelta => { + // Read 20 bytes to get the reference object SHA1 hash + let mut buf_ref = [0; 20]; + pack.read_exact(&mut buf_ref).unwrap(); + let ref_sha1 = SHA1::from_bytes(buf_ref.as_ref()); //TODO SHA1::from_stream() + // Offset is incremented by 20 bytes + *offset += 20; //TODO 改为常量 + + let (data, raw_size) = self.decompress_data(pack, size)?; + *offset += raw_size; + + Ok(CacheObject { + base_ref: ref_sha1, + data_decompress: reserve_delta_data(data), + obj_type: t, + offset: init_offset, + ..Default::default() + }) + } + } + } + + /// Decodes a pack file from a given Read and BufRead source and get a vec of objects. + /// + /// + pub fn decode(&mut self, pack: &mut (impl Read + BufRead + Seek + Send), sender: Option>) -> Result<(), GitError> { + let time = Instant::now(); + + // let tmp_path = tmp_path.join(Uuid::new_v4().to_string()); //maybe Snowflake or ULID is better (less collision) + // let caches = Arc::new(Caches::new(Some(mem_size), Some(tmp_path.clone()), self.pool.max_count())); + let caches = self.caches.clone(); + let mut reader = Wrapper::new(io::BufReader::new(pack)); + + let result = Pack::check_header(&mut reader); + match result { + Ok((object_num, _)) => { + self.number = object_num as usize; + }, + Err(e) => { + return Err(e); + } + } + println!("The pack file has {} objects", self.number); + + let mut offset: usize = 12; + let i = Arc::new(AtomicUsize::new(1)); + + // debug log thread g + #[cfg(debug_assertions)] + let stop = Arc::new(AtomicBool::new(false)); + #[cfg(debug_assertions)] + { // LOG + let log_pool = self.pool.clone(); + let log_cache = caches.clone(); + let log_i = i.clone(); + let log_stop = stop.clone(); + // print log per seconds + thread::spawn(move|| { + let time = Instant::now(); + loop { + if log_stop.load(Ordering::Relaxed) { + break; + } + println!("time {:?} s \t pass: {:?}, \t dec-num: {} \t cah-num: {} \t Objs: {} MB \t CacheUsed: {} MB", + time.elapsed().as_millis() as f64 / 1000.0, log_i.load(Ordering::Relaxed), log_pool.queued_count(), log_cache.queued_tasks(), + CacheObject::get_mem_size() / 1024 / 1024, + log_cache.memory_used() / 1024 / 1024); + + sleep(std::time::Duration::from_secs(1)); + } + }); + } // LOG + + while i.load(Ordering::Relaxed) <= self.number { + // 3 parts: Waitlist + TheadPool + Caches + // hardcode the limit of the tasks of threads_pool queue, to limit memory + while self.memory_used() > self.mem_limit || self.pool.queued_count() > 2000 { + thread::yield_now(); + } + let r: Result = self.decode_pack_object(&mut reader, &mut offset); + match r { + Ok(obj) => { + obj.record_mem_size(); + + let caches = caches.clone(); + let pool = self.pool.clone(); + let waitlist = self.waitlist.clone(); + let sender = sender.clone(); + self.pool.execute(move || { + match obj.obj_type { + ObjectType::Commit | ObjectType::Tree | ObjectType::Blob | ObjectType::Tag => { + let obj = Self::cache_obj_and_process_waitlist(pool, waitlist, caches, obj, sender.clone()); + if let Some(sender) = sender { + sender.send(obj.to_entry()).unwrap(); + } + }, + ObjectType::OffsetDelta => { + if let Some(base_obj) = caches.get_by_offset(obj.base_offset) { + Self::process_delta(pool, waitlist, caches, obj, base_obj, sender); + } else { + // You can delete this 'if' block ↑, because there are Second check in 'else' + // It will be more readable, but the performance will be slightly reduced + let base_offset = obj.base_offset; + waitlist.insert_offset(obj.base_offset, obj); + // Second check: prevent that the base_obj thread has finished before the waitlist insert + if let Some(base_obj) = caches.get_by_offset(base_offset) { + Self::process_waitlist(pool, waitlist, caches, base_obj, sender); + } + } + }, + ObjectType::HashDelta => { + if let Some(base_obj) = caches.get_by_hash(obj.base_ref) { + Self::process_delta(pool, waitlist, caches, obj, base_obj, sender); + } else { + let base_ref = obj.base_ref; + waitlist.insert_ref(obj.base_ref, obj); + if let Some(base_obj) = caches.get_by_hash(base_ref) { + Self::process_waitlist(pool, waitlist, caches, base_obj, sender); + } + } + } + } + }); + }, + Err(e) => { + return Err(e); + } + } + i.fetch_add(1, Ordering::Relaxed); + } + + let render_hash = reader.final_hash(); + let mut trailer_buf = [0; 20]; + reader.read_exact(&mut trailer_buf).unwrap(); + self.signature = SHA1::from_bytes(trailer_buf.as_ref()); + + if render_hash != self.signature { + return Err(GitError::InvalidPackFile(format!( + "The pack file hash {} does not match the trailer hash {}", + render_hash.to_plain_str(), + self.signature.to_plain_str() + ))); + } + + let end = utils::is_eof(&mut reader); + if !end { + return Err(GitError::InvalidPackFile( + "The pack file is not at the end".to_string() + )); + } + + self.pool.join(); // wait for all threads to finish + // !Attention: Caches threadpool may not stop, but it's not a problem (garbage file data) + // So that files != self.number + assert_eq!(self.waitlist.map_offset.len(), 0); + assert_eq!(self.waitlist.map_ref.len(), 0); + assert_eq!(self.number, caches.total_inserted()); + println!("The pack file has been decoded successfully"); + println!("Pack decode takes: [ {:?} ]", time.elapsed()); + + self.caches.clear(); // clear cached objects & stop threads + assert_eq!(CacheObject::get_mem_size(), 0); // all the objs should be dropped until here + + #[cfg(debug_assertions)] + stop.store(true, Ordering::Relaxed); + + Ok(()) + } + + /// CacheObjects + Index size of Caches + fn memory_used(&self) -> usize { + CacheObject::get_mem_size() + self.caches.memory_used_index() + } + + /// Rebuild the Delta Object in a new thread & process the objects waiting for it recursively. + ///
This function must be *static*, because [&self] can't be moved into a new thread. + fn process_delta(pool: Arc, waitlist: Arc, caches: Arc, delta_obj: CacheObject, base_obj: Arc, sender: Option>) { + pool.clone().execute(move || { + let new_obj = Pack::rebuild_delta(delta_obj, base_obj); + if let Some(sender) = sender.clone() { + sender.send(new_obj.to_entry()).unwrap(); + } + Self::cache_obj_and_process_waitlist(pool, waitlist, caches, new_obj, sender); //Indirect Recursion + }); + } + + /// Cache the new object & process the objects waiting for it (in multi-threading). + fn cache_obj_and_process_waitlist(pool: Arc, waitlist: Arc, caches: Arc, new_obj: CacheObject, sender: Option>) -> Arc { + let new_obj = caches.insert(new_obj.offset, new_obj.hash, new_obj); + Self::process_waitlist(pool, waitlist, caches, new_obj.clone(), sender); + new_obj + } + + fn process_waitlist(pool: Arc, waitlist: Arc, caches: Arc, base_obj: Arc, sender: Option>) { + let wait_objs = waitlist.take(base_obj.offset, base_obj.hash); + for obj in wait_objs { + // Process the objects waiting for the new object(base_obj = new_obj) + Self::process_delta(pool.clone(), waitlist.clone(), caches.clone(), obj, base_obj.clone(), sender.clone()); + } + } + + /// Reconstruct the Delta Object based on the "base object" + /// and return a New object. + pub fn rebuild_delta(delta_obj: CacheObject, base_obj: Arc) -> CacheObject { + const COPY_INSTRUCTION_FLAG: u8 = 1 << 7; + const COPY_OFFSET_BYTES: u8 = 4; + const COPY_SIZE_BYTES: u8 = 3; + const COPY_ZERO_SIZE: usize = 0x10000; + + let mut stream = Cursor::new(&delta_obj.data_decompress); + + // Read the base object size & Result Size + // (Size Encoding) + let base_size = utils::read_varint_le(&mut stream).unwrap().0; + let result_size = utils::read_varint_le(&mut stream).unwrap().0; + + //Get the base object row data + let base_info = &base_obj.data_decompress; + assert_eq!(base_info.len() as u64, base_size); + + let mut result = Vec::with_capacity(result_size as usize); + + loop { + // Check if the stream has ended, meaning the new object is done + let instruction = match utils::read_bytes(&mut stream) { + Ok([instruction]) => instruction, + Err(err) if err.kind() == ErrorKind::UnexpectedEof => break, + Err(err) => { + panic!( + "{}", + GitError::DeltaObjectError(format!("Wrong instruction in delta :{}", err)) + ); + } + }; + + if instruction & COPY_INSTRUCTION_FLAG == 0 { + // Data instruction; the instruction byte specifies the number of data bytes + if instruction == 0 { + // Appending 0 bytes doesn't make sense, so git disallows it + panic!( + "{}", + GitError::DeltaObjectError(String::from("Invalid data instruction")) + ); + } + + // Append the provided bytes + let mut data = vec![0; instruction as usize]; + stream.read_exact(&mut data).unwrap(); + result.extend_from_slice(&data); + } else { + // Copy instruction + // +----------+---------+---------+---------+---------+-------+-------+-------+ + // | 1xxxxxxx | offset1 | offset2 | offset3 | offset4 | size1 | size2 | size3 | + // +----------+---------+---------+---------+---------+-------+-------+-------+ + let mut nonzero_bytes = instruction; + let offset = utils::read_partial_int(&mut stream, COPY_OFFSET_BYTES, &mut nonzero_bytes).unwrap(); + let mut size = utils::read_partial_int(&mut stream, COPY_SIZE_BYTES, &mut nonzero_bytes).unwrap(); + if size == 0 { + // Copying 0 bytes doesn't make sense, so git assumes a different size + size = COPY_ZERO_SIZE; + } + // Copy bytes from the base object + let base_data = base_info.get(offset..(offset + size)).ok_or_else(|| { + GitError::DeltaObjectError("Invalid copy instruction".to_string()) + }); + + match base_data { + Ok(data) => result.extend_from_slice(data), + Err(e) => panic!("{}", e), + } + } + } + assert_eq!(result_size, result.len() as u64); + + let hash = utils::calculate_object_hash(base_obj.obj_type, &result); + // create new obj from `delta_obj` & `result` instead of modifying `delta_obj` for heap-size recording + let new_obj = CacheObject { + data_decompress: result, + obj_type: base_obj.obj_type, // Same as the Type of base object + hash, + ..delta_obj + }; + new_obj.record_mem_size(); + new_obj //Canonical form (Complete Object) + } +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::io::prelude::*; + use std::io::BufReader; + use std::io::Cursor; + use std::{env, path::PathBuf}; + + use flate2::write::ZlibEncoder; + use flate2::Compression; + + use crate::internal::pack::Pack; + + #[test] + fn test_pack_check_header() { + let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap()); + source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack"); + + let f = std::fs::File::open(source).unwrap(); + let mut buf_reader = BufReader::new(f); + let (object_num, _) = Pack::check_header(&mut buf_reader).unwrap(); + + assert_eq!(object_num, 358109); + } + + #[test] + fn test_decompress_data() { + let data = b"Hello, world!"; // Sample data to compress and then decompress + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(data).unwrap(); + let compressed_data = encoder.finish().unwrap(); + let compressed_size = compressed_data.len(); + + // Create a cursor for the compressed data to simulate a Read + BufRead source + let mut cursor: Cursor> = Cursor::new(compressed_data); + let expected_size = data.len(); + + // Decompress the data and assert correctness + let mut p = Pack::new(None, None, None); + let result = p.decompress_data(&mut cursor, expected_size); + match result { + Ok((decompressed_data, bytes_read)) => { + assert_eq!(bytes_read, compressed_size); + assert_eq!(decompressed_data, data); + }, + Err(e) => panic!("Decompression failed: {:?}", e), + } + } + + #[test] + fn test_pack_decode_without_delta() { + let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap()); + source.push("tests/data/packs/pack-1d0e6c14760c956c173ede71cb28f33d921e232f.pack"); + + let tmp = PathBuf::from("/tmp/.cache_temp"); + + let f = std::fs::File::open(source).unwrap(); + let mut buffered = BufReader::new(f); + let mut p = Pack::new(None, Some(1024*1024*20), Some(tmp)); + p.decode(&mut buffered, None).unwrap(); + } + + #[test] + fn test_pack_decode_with_ref_delta() { + let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap()); + source.push("tests/data/packs/ref-delta-65d47638aa7cb7c39f1bd1d5011a415439b887a8.pack"); + + let tmp = PathBuf::from("/tmp/.cache_temp"); + + let f = std::fs::File::open(source).unwrap(); + let mut buffered = BufReader::new(f); + let mut p = Pack::new(None, Some(1024*1024*20), Some(tmp)); + p.decode(&mut buffered, None).unwrap(); + } + + #[test] + fn test_pack_decode_with_large_file_with_delta_without_ref() { + let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap()); + source.push("tests/data/packs/git-2d187177923cd618a75da6c6db45bb89d92bd504.pack"); + + let tmp = PathBuf::from("/tmp/.cache_temp"); + + let f = std::fs::File::open(source).unwrap(); + let mut buffered = BufReader::new(f); + // let mut p = Pack::default(); //Pack::new(2); + let mut p = Pack::new(Some(20), Some(1024*1024*1024*4), Some(tmp.clone())); + let rt = p.decode(&mut buffered, None); + if let Err(e) = rt { + fs::remove_dir_all(tmp).unwrap(); + panic!("Error: {:?}", e); + } + } // it will be stuck on dropping `Pack` on Windows if `mem_size` is None, so we need `mimalloc` + + #[test] + fn test_pack_decode_with_delta_without_ref() { + let mut source = PathBuf::from(env::current_dir().unwrap().parent().unwrap()); + source.push("tests/data/packs/pack-d50df695086eea6253a237cb5ac44af1629e7ced.pack"); + + let tmp = PathBuf::from("/tmp/.cache_temp"); + + let f = std::fs::File::open(source).unwrap(); + let mut buffered = BufReader::new(f); + let mut p = Pack::new(None, Some(1024*1024*20), Some(tmp)); + p.decode(&mut buffered, None).unwrap(); + } + + #[test] + #[ignore] + /// didn't implement the parallel support + fn test_pack_decode_multi_task_with_large_file_with_delta_without_ref() { + // unimplemented!() + let task1 = std::thread::spawn(|| { + test_pack_decode_with_large_file_with_delta_without_ref(); + }); + let task2 = std::thread::spawn(|| { + test_pack_decode_with_large_file_with_delta_without_ref(); + }); + + task1.join().unwrap(); + task2.join().unwrap(); + } +} diff --git a/pack-rs/waitlist.rs b/pack-rs/waitlist.rs new file mode 100644 index 0000000000000000000000000000000000000000..cc9eee6f48054944fff5eb2c67ef7669198482b8 --- /dev/null +++ b/pack-rs/waitlist.rs @@ -0,0 +1,38 @@ +use dashmap::DashMap; +use venus::hash::SHA1; +use crate::internal::pack::cache_object::CacheObject; + +/// Waitlist for Delta objects while the Base object is not ready. +/// Easier and faster than Channels. +#[derive(Default, Debug)] +pub struct Waitlist { //TODO Memory Control! + pub map_offset: DashMap>, + pub map_ref: DashMap>, +} + +impl Waitlist { + pub fn new() -> Self { + Self::default() + } + + pub fn insert_offset(&self, offset: usize, obj: CacheObject) { + self.map_offset.entry(offset).or_default().push(obj); + } + + pub fn insert_ref(&self, hash: SHA1, obj: CacheObject) { + self.map_ref.entry(hash).or_default().push(obj); + } + + /// Take objects out (get & remove) + ///
Return Vec::new() if None + pub fn take(&self, offset: usize, hash: SHA1) -> Vec { + let mut res = Vec::new(); + if let Some((_, vec)) = self.map_offset.remove(&offset) { + res.extend(vec); + } + if let Some((_, vec)) = self.map_ref.remove(&hash) { + res.extend(vec); + } + res + } +} \ No newline at end of file