diff --git a/ylong_http_client/src/async_impl/dns/resolver.rs b/ylong_http_client/src/async_impl/dns/resolver.rs index 86be5c0c9bedae5c38b36148bfbb511389d9d0cc..ab021bbd255f7fe281ff8e0d2a07f71189acc632 100644 --- a/ylong_http_client/src/async_impl/dns/resolver.rs +++ b/ylong_http_client/src/async_impl/dns/resolver.rs @@ -13,29 +13,36 @@ //! `Resolver` trait and `DefaultDnsResolver` implementation. +use std::collections::HashMap; use std::future::Future; use std::io; use std::io::Error; use std::net::{SocketAddr, ToSocketAddrs}; use std::pin::Pin; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use std::vec::IntoIter; use crate::runtime::JoinHandle; +const DEFAULT_TTL: Duration = Duration::from_secs(60); +const MAX_ENTRIES_LEN: usize = 30000; + /// `SocketAddr` resolved by `Resolver`. pub type Addrs = Box + Sync + Send>; /// Possible errors that this resolver may generate when attempting to /// resolve. pub type StdError = Box; /// Futures generated by this resolve when attempting to resolve an address. -pub type SocketFuture = Pin> + Sync + Send>>; +pub type SocketFuture<'a> = + Pin> + Sync + Send + 'a>>; /// `Resolver` trait used by `async_impl::connector::HttpConnector`. `Resolver` /// provides asynchronous dns resolve interfaces. pub trait Resolver: Send + Sync + 'static { - /// resolve domain to a `SocketAddr` `Future`. - fn resolve(&self, domain: &str) -> SocketFuture; + /// resolve authority to a `SocketAddr` `Future`. + fn resolve(&self, authority: &str) -> SocketFuture; } /// `SocketAddr` resolved by `DefaultDnsResolver`. @@ -87,18 +94,341 @@ impl Future for DefaultDnsFuture { } } -/// Default dns resolver used by the `Client`。 +/// Default dns resolver used by the `Client`. +/// DefaultDnsResolver provides DNS resolver with caching machanism. +pub struct DefaultDnsResolver { + manager: DnsManager, // Manages DNS cache + connector: DnsConnector, // Performing DNS resolution + ttl: Duration, // Time-to-live for the DNS cache +} + +impl Default for DefaultDnsResolver { + // Default constructor for `DefaultDnsResolver`, with a default TTL of 60 + // seconds. + fn default() -> Self { + DefaultDnsResolver { + manager: DnsManager::default(), + connector: DnsConnector {}, + ttl: DEFAULT_TTL, // Default TTL set to 60 seconds + } + } +} + +impl DefaultDnsResolver { + /// Create a new DefaultDnsResolver. And TTL is Time to live for cache. + /// + /// # Examples + /// + /// ``` + /// use std::time::Duration; + /// + /// use ylong_http_client::async_impl::DefaultDnsResolver; + /// + /// let res = DefaultDnsResolver::new(Duration::from_secs(1)); + /// ``` + pub fn new(ttl: Duration) -> Self { + DefaultDnsResolver { + manager: DnsManager::new(), + connector: DnsConnector {}, + ttl, // Set TTL through the passed parameters + } + } +} + #[derive(Default)] -pub struct DefaultDnsResolver {} +struct DnsManager { + // Cache storing authority and DNS results + map: Mutex>, +} + +impl DnsManager { + // Creates a new `DnsManager` instance with an empty cache + fn new() -> Self { + DnsManager { + map: Mutex::new(HashMap::new()), + } + } + + // Cleans expired DNS cache entries by retaining only valid ones + fn clean_expired_entries(&self) { + let mut map_lock = self.map.lock().unwrap(); + if map_lock.len() > MAX_ENTRIES_LEN { + map_lock.retain(|_, result| result.inner.lock().unwrap().is_valid()); + } + } +} + +struct DnsResult { + inner: Arc>, +} + +impl DnsResult { + // Creates a new DNS result with the given addresses and expiration time + fn new(addr: Vec, expiration_time: Instant) -> Self { + DnsResult { + inner: Arc::new(Mutex::new(DnsResultInner { + addr, + expiration_time, + })), + } + } +} + +#[derive(Clone)] +struct DnsResultInner { + addr: Vec, // List of resolved addresses for the authority + expiration_time: Instant, // Expiration time for the cache entry +} + +impl DnsResultInner { + // Checks if the DNS result is still valid + fn is_valid(&self) -> bool { + self.expiration_time > Instant::now() + } +} + +impl Default for DnsResultInner { + // Default constructor for `DnsResultInner`, with an empty address list and 60 + // seconds expiration + fn default() -> Self { + DnsResultInner { + addr: vec![], + expiration_time: Instant::now() + Duration::from_secs(60), + } + } +} + +struct DnsConnector {} + +impl DnsConnector { + // Resolves the authority to a list of socket addresses + fn get_socket_addrs(&self, authority: &str) -> Result, io::Error> { + authority + .to_socket_addrs() + .map(|addrs| addrs.collect()) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + } +} impl Resolver for DefaultDnsResolver { - fn resolve(&self, domain: &str) -> SocketFuture { - let domain: Box = domain.into(); - let blocking = crate::runtime::spawn_blocking(move || { - (*domain) - .to_socket_addrs() - .map(|iter| ResolvedAddrs { iter }) - }); - Box::pin(DefaultDnsFuture { inner: blocking }) + fn resolve(&self, authority: &str) -> SocketFuture { + let authority = authority.to_string(); + self.manager.clean_expired_entries(); + Box::pin(async move { + let mut map_lock = self.manager.map.lock().unwrap(); + if let Some(addrs) = map_lock.get(&authority) { + let lock_inner = addrs.inner.lock().unwrap(); + if lock_inner.is_valid() { + return Ok(Box::new(lock_inner.addr.clone().into_iter()) as Addrs); + } + } + match self.connector.get_socket_addrs(&authority) { + Ok(addrs) => { + let dns_result = DnsResult::new(addrs.clone(), Instant::now() + self.ttl); + map_lock.insert(authority, dns_result); + Ok(Box::new(addrs.into_iter()) as Addrs) + } + Err(err) => Err(Box::new(err) as StdError), + } + }) + } +} + +#[cfg(feature = "tokio_base")] +#[cfg(test)] +mod ut_dns_cache { + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use tokio::sync::Mutex; + + use super::*; + + /// UT test cases for `DefaultDnsResolver::resolve`. + /// + /// # Brief + /// 1. Test that DNS resolution is correctly cached after the first + /// resolution. + /// 2. Ensure that a second resolution within TTL returns the same result + /// and is faster. + /// 3. Check that after TTL expiration, the resolver performs another + /// resolution and returns fresh results and is slower that second one. + #[tokio::test] + async fn ut_dns_cache_test_async() { + let resolver = DefaultDnsResolver::new(Duration::from_millis(100)); + let domain = "example.com:0"; + let start = Instant::now(); + let addrs1 = resolver.resolve(domain).await; + let duration1 = start.elapsed(); + assert!(addrs1.is_ok()); + tokio::time::sleep(Duration::from_millis(80)).await; + let start = Instant::now(); + let addrs2 = resolver.resolve(domain).await; + let duration2 = start.elapsed(); + assert!(addrs2.is_ok()); + if let (Ok(addrs1), Ok(addrs2)) = (addrs1, addrs2) { + let addrs1_vec: Vec = addrs1.collect(); + let addrs2_vec: Vec = addrs2.collect(); + assert_eq!(addrs1_vec, addrs2_vec); + } + assert!(duration1 > duration2); + tokio::time::sleep(Duration::from_millis(80)).await; + let start = Instant::now(); + let _addrs3 = resolver.resolve(domain).await; + let duration3 = start.elapsed(); + assert!(duration3 > duration2); + } + + /// UT test cases for `DefaultDnsResolver::resolve` under multiple + /// concurrent requests. + /// + /// # Brief + /// 1. Test that multiple concurrent DNS resolution requests return the same + /// result. + /// 2. Ensure that only the first request performs the resolution and others + /// wait for the result. + /// 3. Check that subsequent requests are faster, as they use the cached + /// result. + #[tokio::test] + async fn ut_dns_cache_test_multi_async() { + let domain = "example.com:0"; + let resolver = Arc::new(Mutex::new(DefaultDnsResolver::new(Duration::from_millis( + 500, + )))); + let first_duration = Arc::new(Mutex::new(None::)); + let first_addrs = Arc::new(Mutex::new(None::>)); + let mut handles = vec![]; + for _ in 0..3 { + let resolver = Arc::clone(&resolver); + let first_duration = Arc::clone(&first_duration); + let first_addrs = Arc::clone(&first_addrs); + let handle = tokio::spawn(async move { + let resolver = resolver.lock().await; + let start = Instant::now(); + let addrs = resolver.resolve(domain).await; + let duration = start.elapsed(); + assert!(addrs.is_ok()); + let addrs = addrs.unwrap().collect::>(); + let mut first_duration_locked = first_duration.lock().await; + if first_duration_locked.is_none() { + *first_duration_locked = Some(duration); + let mut first_addrs_locked = first_addrs.lock().await; + *first_addrs_locked = Some(addrs); + } else { + let first_duration_locked = first_duration_locked.as_ref().unwrap(); + assert!(*first_duration_locked > duration); + + let first_addrs_locked = first_addrs.lock().await; + assert!(*first_addrs_locked.as_ref().unwrap() == addrs); + } + }); + handles.push(handle); + } + for handle in handles { + handle.await.unwrap(); + } + } +} + +#[cfg(feature = "ylong_base")] +#[cfg(test)] +mod ut_dns_cache { + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use ylong_runtime::sync::Mutex; + + use super::*; + + /// UT test cases for `DefaultDnsResolver::resolve`. + /// + /// # Brief + /// 1. Test that DNS resolution is correctly cached after the first + /// resolution. + /// 2. Ensure that a second resolution within TTL returns the same result + /// and is faster. + /// 3. Check that after TTL expiration, the resolver performs another + /// resolution and returns fresh results and is slower that second one. + #[test] + fn ut_dns_cache_test() { + ylong_runtime::block_on(ut_dns_cache_test_async()); + } + + async fn ut_dns_cache_test_async() { + let resolver = DefaultDnsResolver::new(Duration::from_millis(100)); + let domain = "example.com:0"; + let start = Instant::now(); + let addrs1 = resolver.resolve(domain).await; + let duration1 = start.elapsed(); + assert!(addrs1.is_ok()); + ylong_runtime::time::sleep(Duration::from_millis(80)).await; + let start = Instant::now(); + let addrs2 = resolver.resolve(domain).await; + let duration2 = start.elapsed(); + assert!(addrs2.is_ok()); + if let (Ok(addrs1), Ok(addrs2)) = (addrs1, addrs2) { + let addrs1_vec: Vec = addrs1.collect(); + let addrs2_vec: Vec = addrs2.collect(); + assert_eq!(addrs1_vec, addrs2_vec); + } + assert!(duration1 > duration2); + ylong_runtime::time::sleep(Duration::from_millis(80)).await; + let start = Instant::now(); + let _addrs3 = resolver.resolve(domain).await; + let duration3 = start.elapsed(); + assert!(duration3 > duration2); + } + + /// UT test cases for `DefaultDnsResolver::resolve` under multiple + /// concurrent requests. + /// + /// # Brief + /// 1. Test that multiple concurrent DNS resolution requests return the same + /// result. + /// 2. Ensure that only the first request performs the resolution and others + /// wait for the result. + /// 3. Check that subsequent requests are faster, as they use the cached + /// result. + #[test] + fn ut_dns_cache_test_multi() { + ylong_runtime::block_on(ut_dns_cache_test_multi_async()); + } + + async fn ut_dns_cache_test_multi_async() { + let domain = "example.com:0"; + let resolver = Arc::new(Mutex::new(DefaultDnsResolver::new(Duration::from_millis( + 500, + )))); + let first_duration = Arc::new(Mutex::new(None::)); + let first_addrs = Arc::new(Mutex::new(None::>)); + let mut handles = Vec::new(); + for _ in 0..3 { + let resolver = Arc::clone(&resolver); + let first_duration = Arc::clone(&first_duration); + let first_addrs = Arc::clone(&first_addrs); + let handle = ylong_runtime::spawn(async move { + let resolver = resolver.lock().await; + let start = Instant::now(); + let addrs = resolver.resolve(domain).await; + let duration = start.elapsed(); + assert!(addrs.is_ok()); + let addrs = addrs.unwrap().collect::>(); + let mut first_duration_locked = first_duration.lock().await; + if first_duration_locked.is_none() { + *first_duration_locked = Some(duration); + let mut first_addrs_locked = first_addrs.lock().await; + *first_addrs_locked = Some(addrs); + } else { + let first_duration_locked = first_duration_locked.as_ref().unwrap(); + assert!(*first_duration_locked > duration); + let first_addrs_locked = first_addrs.lock().await; + assert!(*first_addrs_locked.as_ref().unwrap() == addrs); + } + }); + handles.push(handle); + } + for handle in handles { + handle.await.unwrap(); + } } } diff --git a/ylong_http_client/src/lib.rs b/ylong_http_client/src/lib.rs index 0c650b8d5239525301f3b27ef3e2c530fb35d66f..41340fd1e00922f4b840b3cb444f50260504650c 100644 --- a/ylong_http_client/src/lib.rs +++ b/ylong_http_client/src/lib.rs @@ -84,14 +84,13 @@ pub(crate) mod runtime { pub(crate) use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, net::TcpStream, - task::{spawn_blocking, JoinHandle}, + task::JoinHandle, time::{sleep, timeout, Sleep}, }; #[cfg(feature = "ylong_base")] pub(crate) use ylong_runtime::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, net::TcpStream, - spawn_blocking, task::JoinHandle, time::{sleep, timeout, Sleep}, }; diff --git a/ylong_http_client/tests/sdv_async_custom_dns_resolver.rs b/ylong_http_client/tests/sdv_async_custom_dns_resolver.rs index 144992e1c709871f3f2164ad06ea780b18cf4d42..241d157d71aa936dfe3350f970e19d86c067f5fe 100644 --- a/ylong_http_client/tests/sdv_async_custom_dns_resolver.rs +++ b/ylong_http_client/tests/sdv_async_custom_dns_resolver.rs @@ -57,7 +57,7 @@ fn sdv_client_custom_dns_resolver() { let handle = handles_vec.pop().expect("No more handles !"); - let resolver = DefaultDnsResolver {}; + let resolver = DefaultDnsResolver::default(); let client = Client::builder().dns_resolver(resolver).build().unwrap(); let shutdown_handle = ylong_runtime::spawn(async move {