From 05a72cf6a239373e32a21e965c31db76aea42afd Mon Sep 17 00:00:00 2001 From: Tiga Ultraman Date: Mon, 14 Oct 2024 16:42:41 +0800 Subject: [PATCH] cherry pick c72235f from https://gitee.com/tiga-ultraman/commonlibrary_rust_ylong_http/pulls/167 add http1 max conn num Signed-off-by: Tiga Ultraman --- ylong_http_client/src/async_impl/client.rs | 17 ++++ ylong_http_client/src/async_impl/pool.rs | 75 +++++++++++++----- ylong_http_client/src/lib.rs | 2 + ylong_http_client/src/util/config/http.rs | 34 ++++++++ ylong_http_client/src/util/dispatcher.rs | 77 ++++++++++++++++++- ylong_http_client/src/util/pool.rs | 10 +-- ylong_http_client/tests/common/async_utils.rs | 1 + .../tests/tcp_server/async_utils.rs | 2 +- 8 files changed, 186 insertions(+), 32 deletions(-) diff --git a/ylong_http_client/src/async_impl/client.rs b/ylong_http_client/src/async_impl/client.rs index b505020..267fdd0 100644 --- a/ylong_http_client/src/async_impl/client.rs +++ b/ylong_http_client/src/async_impl/client.rs @@ -445,6 +445,22 @@ impl ClientBuilder { self } + /// Set the maximum number of http1 connections allowed. + /// + /// By default, the maximum number of http1 connections allowed is 6. + /// + /// # Examples + /// + /// ``` + /// use ylong_http_client::async_impl::ClientBuilder; + /// + /// let builder = ClientBuilder::new().max_h1_conn_number(5); + /// ``` + pub fn max_h1_conn_number(mut self, number: usize) -> Self { + self.http.http1_config.set_max_conn_num(number); + self + } + /// Adds a `Interceptor` to the `Client`. /// /// # Examples @@ -1324,6 +1340,7 @@ HJMRZVCQpSMzvHlofHSNgzWV1MX5h1CP4SGZdBDTfA== ); let client = Client::builder() .connect_timeout(Timeout::from_secs(2)) + .max_h1_conn_number(10) .http1_only() .build() .unwrap(); diff --git a/ylong_http_client/src/async_impl/pool.rs b/ylong_http_client/src/async_impl/pool.rs index c84cfda..4f3c57a 100644 --- a/ylong_http_client/src/async_impl/pool.rs +++ b/ylong_http_client/src/async_impl/pool.rs @@ -35,6 +35,7 @@ use crate::util::config::H2Config; #[cfg(feature = "http3")] use crate::util::config::H3Config; use crate::util::config::{HttpConfig, HttpVersion}; +use crate::util::dispatcher::http1::{WrappedSemPermit, WrappedSemaphore}; use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher, TimeInfoConn}; use crate::util::pool::{Pool, PoolKey}; #[cfg(feature = "http3")] @@ -74,9 +75,9 @@ impl ConnPool { #[cfg(feature = "http3")] let alt_svc = self.alt_svcs.get_alt_svcs(&key); - + let default_conn = Conns::new(self.config.http1_config.max_conn_num()); self.pool - .get(key, Conns::new) + .get(key, default_conn) .conn( self.config.clone(), self.connector.clone(), @@ -93,7 +94,13 @@ impl ConnPool { } } +pub(crate) enum H1ConnOption { + Some(T), + None(WrappedSemPermit), +} + pub(crate) struct Conns { + usable: WrappedSemaphore, list: Arc>>>, #[cfg(feature = "http2")] h2_conn: Arc>>>, @@ -102,8 +109,10 @@ pub(crate) struct Conns { } impl Conns { - fn new() -> Self { + fn new(max_conn_num: usize) -> Self { Self { + usable: WrappedSemaphore::new(max_conn_num), + list: Arc::new(Mutex::new(Vec::new())), #[cfg(feature = "http2")] @@ -120,6 +129,7 @@ impl Conns { impl Clone for Conns { fn clone(&self) -> Self { Self { + usable: self.usable.clone(), list: self.list.clone(), #[cfg(feature = "http2")] @@ -180,15 +190,18 @@ impl Conns where C: Connector, { - if let Some(conn) = self.exist_h1_conn() { - return Ok(TimeInfoConn::new(conn, TimeGroup::default())); + let semaphore = self.usable.acquire().await; + match self.exist_h1_conn(semaphore) { + H1ConnOption::Some(conn) => Ok(TimeInfoConn::new(conn, TimeGroup::default())), + H1ConnOption::None(permit) => { + let stream = connector.connect(url, HttpVersion::Http1).await?; + let time_group = take(stream.conn_data().time_group_mut()); + + let dispatcher = ConnDispatcher::http1(stream); + let conn = self.dispatch_h1_conn(dispatcher, permit); + Ok(TimeInfoConn::new(conn, time_group)) + } } - let stream = connector.connect(url, HttpVersion::Http1).await?; - let time_group = take(stream.conn_data().time_group_mut()); - - let dispatcher = ConnDispatcher::http1(stream); - let conn = self.dispatch_h1_conn(dispatcher); - Ok(TimeInfoConn::new(conn, time_group)) } #[cfg(feature = "http2")] @@ -274,10 +287,13 @@ impl Conns if let Some(conn) = Self::exist_h2_conn(&mut lock) { return Ok(TimeInfoConn::new(conn, TimeGroup::default())); } - - if let Some(conn) = self.exist_h1_conn() { - return Ok(TimeInfoConn::new(conn, TimeGroup::default())); - } + let permit = self.usable.acquire().await; + let permit = match self.exist_h1_conn(permit) { + H1ConnOption::Some(conn) => { + return Ok(TimeInfoConn::new(conn, TimeGroup::default())); + } + H1ConnOption::None(permit) => permit, + }; let stream = connector.connect(url, HttpVersion::Negotiate).await?; let mut data = stream.conn_data(); let time_group = take(data.time_group_mut()); @@ -287,7 +303,7 @@ impl Conns } else { let dispatcher = ConnDispatcher::http1(stream); return Ok(TimeInfoConn::new( - self.dispatch_h1_conn(dispatcher), + self.dispatch_h1_conn(dispatcher, permit), time_group, )); }; @@ -295,13 +311,15 @@ impl Conns if protocol == b"http/1.1" { let dispatcher = ConnDispatcher::http1(stream); Ok(TimeInfoConn::new( - self.dispatch_h1_conn(dispatcher), + self.dispatch_h1_conn(dispatcher, permit), time_group, )) } else if protocol == b"h2" { + permit.release(); let conn = Self::dispatch_h2_conn(data.detail(), h2_config, stream, &mut lock); Ok(TimeInfoConn::new(conn, time_group)) } else { + permit.release(); err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") } } @@ -361,11 +379,20 @@ impl Conns None } - fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher) -> Conn { + fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher, permit: WrappedSemPermit) -> Conn { // We must be able to get the `Conn` here. - let conn = dispatcher.dispatch().unwrap(); + let mut conn = dispatcher.dispatch().unwrap(); let mut list = self.list.lock().unwrap(); list.push(dispatcher); + #[cfg(any(feature = "http2", feature = "http3"))] + if let Conn::Http1(ref mut h1) = conn { + h1.occupy_sem(permit) + } + #[cfg(all(not(feature = "http2"), not(feature = "http3")))] + { + let Conn::Http1(ref mut h1) = conn; + h1.occupy_sem(permit) + } conn } @@ -396,7 +423,7 @@ impl Conns conn } - fn exist_h1_conn(&self) -> Option> { + fn exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption> { let mut list = self.list.lock().unwrap(); let mut conn = None; let curr = take(&mut *list); @@ -411,7 +438,13 @@ impl Conns } list.push(dispatcher); } - conn + match conn { + Some(Conn::Http1(mut h1)) => { + h1.occupy_sem(permit); + H1ConnOption::Some(Conn::Http1(h1)) + } + _ => H1ConnOption::None(permit), + } } #[cfg(feature = "http2")] diff --git a/ylong_http_client/src/lib.rs b/ylong_http_client/src/lib.rs index 41340fd..f4bbed2 100644 --- a/ylong_http_client/src/lib.rs +++ b/ylong_http_client/src/lib.rs @@ -84,6 +84,7 @@ pub(crate) mod runtime { pub(crate) use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, net::TcpStream, + sync::{OwnedSemaphorePermit as SemaphorePermit, Semaphore}, task::JoinHandle, time::{sleep, timeout, Sleep}, }; @@ -91,6 +92,7 @@ pub(crate) mod runtime { pub(crate) use ylong_runtime::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, net::TcpStream, + sync::Semaphore, task::JoinHandle, time::{sleep, timeout, Sleep}, }; diff --git a/ylong_http_client/src/util/config/http.rs b/ylong_http_client/src/util/config/http.rs index 2b53a2e..abc5f26 100644 --- a/ylong_http_client/src/util/config/http.rs +++ b/ylong_http_client/src/util/config/http.rs @@ -20,6 +20,9 @@ use crate::ErrorKind; pub(crate) struct HttpConfig { pub(crate) version: HttpVersion, + #[cfg(feature = "http1_1")] + pub(crate) http1_config: http1::H1Config, + #[cfg(feature = "http2")] pub(crate) http2_config: http2::H2Config, @@ -33,6 +36,9 @@ impl HttpConfig { Self { version: HttpVersion::Negotiate, + #[cfg(feature = "http1_1")] + http1_config: http1::H1Config::default(), + #[cfg(feature = "http2")] http2_config: http2::H2Config::new(), @@ -86,6 +92,34 @@ impl TryFrom<&[u8]> for HttpVersion { } } +#[cfg(feature = "http1_1")] +pub(crate) mod http1 { + const DEFAULT_MAX_CONN_NUM: usize = 6; + + #[derive(Clone)] + pub(crate) struct H1Config { + max_conn_num: usize, + } + + impl H1Config { + pub(crate) fn set_max_conn_num(&mut self, num: usize) { + self.max_conn_num = num + } + + pub(crate) fn max_conn_num(&self) -> usize { + self.max_conn_num + } + } + + impl Default for H1Config { + fn default() -> Self { + Self { + max_conn_num: DEFAULT_MAX_CONN_NUM, + } + } + } +} + #[cfg(feature = "http2")] pub(crate) mod http2 { const DEFAULT_MAX_FRAME_SIZE: u32 = 16 * 1024; diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index 5942f8e..bcf85f5 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -133,6 +133,9 @@ pub(crate) mod http1 { use std::sync::Arc; use super::{ConnDispatcher, Dispatcher}; + use crate::runtime::Semaphore; + #[cfg(feature = "tokio_base")] + use crate::runtime::SemaphorePermit; impl ConnDispatcher { pub(crate) fn http1(io: S) -> Self { @@ -178,9 +181,7 @@ pub(crate) mod http1 { .occupied .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) .ok() - .map(|_| Http1Conn { - inner: self.inner.clone(), - }) + .map(|_| Http1Conn::from_inner(self.inner.clone())) } fn is_shutdown(&self) -> bool { @@ -194,10 +195,19 @@ pub(crate) mod http1 { /// Handle returned to other threads for I/O operations. pub(crate) struct Http1Conn { + pub(crate) sem: Option, pub(crate) inner: Arc>, } impl Http1Conn { + pub(crate) fn from_inner(inner: Arc>) -> Self { + Self { sem: None, inner } + } + + pub(crate) fn occupy_sem(&mut self, sem: WrappedSemPermit) { + self.sem = Some(sem); + } + pub(crate) fn raw_mut(&mut self) -> &mut S { // SAFETY: In the case of `HTTP1`, only one coroutine gets the handle // at the same time. @@ -214,6 +224,67 @@ pub(crate) mod http1 { self.inner.occupied.store(false, Ordering::Release) } } + + pub(crate) struct WrappedSemaphore { + sem: Arc, + } + + impl WrappedSemaphore { + pub(crate) fn new(permits: usize) -> Self { + Self { + #[cfg(feature = "tokio_base")] + sem: Arc::new(tokio::sync::Semaphore::new(permits)), + #[cfg(feature = "ylong_base")] + sem: Arc::new(ylong_runtime::sync::Semaphore::new(permits).unwrap()), + } + } + + pub(crate) async fn acquire(&self) -> WrappedSemPermit { + #[cfg(feature = "ylong_base")] + { + let semaphore = self.sem.clone(); + let _permit = semaphore.acquire().await.unwrap(); + WrappedSemPermit { sem: semaphore } + } + + #[cfg(feature = "tokio_base")] + { + let permit = self.sem.clone().acquire_owned().await.unwrap(); + WrappedSemPermit { permit } + } + } + } + + impl Clone for WrappedSemaphore { + fn clone(&self) -> Self { + Self { + sem: self.sem.clone(), + } + } + } + + pub(crate) struct WrappedSemPermit { + #[cfg(feature = "ylong_base")] + pub(crate) sem: Arc, + #[cfg(feature = "tokio_base")] + #[allow(dead_code)] + pub(crate) permit: SemaphorePermit, + } + + #[cfg(all(feature = "http2", feature = "http1_1"))] + impl WrappedSemPermit { + pub(crate) fn release(&self) { + #[cfg(feature = "ylong_base")] + self.sem.release(); + } + } + + #[cfg(feature = "ylong_base")] + impl Drop for WrappedSemPermit { + fn drop(&mut self) { + self.sem.release(); + } + } } #[cfg(feature = "http2")] diff --git a/ylong_http_client/src/util/pool.rs b/ylong_http_client/src/util/pool.rs index 767f783..2c0fcbb 100644 --- a/ylong_http_client/src/util/pool.rs +++ b/ylong_http_client/src/util/pool.rs @@ -33,14 +33,11 @@ impl Pool { } impl Pool { - pub(crate) fn get(&self, key: K, create_fn: F) -> V - where - F: FnOnce() -> V, - { + pub(crate) fn get(&self, key: K, default: V) -> V { let mut inner = self.pool.lock().unwrap(); match (*inner).entry(key) { Entry::Occupied(conns) => conns.get().clone(), - Entry::Vacant(e) => e.insert(create_fn()).clone(), + Entry::Vacant(e) => e.insert(default).clone(), } } } @@ -74,9 +71,8 @@ mod ut_pool { uri.authority().unwrap().clone(), ); let data = String::from("Data info"); - let consume_and_return_data = move || data; let pool = Pool::new(); - let res = pool.get(key, consume_and_return_data); + let res = pool.get(key, data); assert_eq!(res, "Data info".to_string()); } } diff --git a/ylong_http_client/tests/common/async_utils.rs b/ylong_http_client/tests/common/async_utils.rs index dc2fbbd..4a50c4f 100644 --- a/ylong_http_client/tests/common/async_utils.rs +++ b/ylong_http_client/tests/common/async_utils.rs @@ -221,6 +221,7 @@ macro_rules! async_client_assert { },)* ) => {{ let client = ylong_http_client::async_impl::Client::builder() + .max_h1_conn_number(10) .tls_ca_file($ca_file) .danger_accept_invalid_hostnames(true) .build() diff --git a/ylong_http_client/tests/tcp_server/async_utils.rs b/ylong_http_client/tests/tcp_server/async_utils.rs index 2e23609..d18fb0e 100644 --- a/ylong_http_client/tests/tcp_server/async_utils.rs +++ b/ylong_http_client/tests/tcp_server/async_utils.rs @@ -124,7 +124,7 @@ macro_rules! async_client_assert_on_tcp { Body: $resp_body: expr, },)* ) => {{ - let client = ylong_http_client::async_impl::Client::new(); + let client = ylong_http_client::async_impl::Client::builder().max_h1_conn_number(10).build().unwrap(); let client = std::sync::Arc::new(client); for _i in 0..$server_num { let handle = $handle_vec.pop().expect("No more handles !"); -- Gitee