diff --git a/ylong_http_client/src/async_impl/client.rs b/ylong_http_client/src/async_impl/client.rs index 2711f6be4c1e4ff57ce5c5340a194ee346664de5..2a309cb5501c2b60f38bd5541d8489253a180743 100644 --- a/ylong_http_client/src/async_impl/client.rs +++ b/ylong_http_client/src/async_impl/client.rs @@ -11,11 +11,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use ylong_http::request::uri::Uri; use super::pool::ConnPool; use super::timeout::TimeoutFuture; use super::{conn, Body, Connector, HttpConnector, Request, Response}; +use crate::async_impl::interceptor::{IdleInterceptor, Interceptor, Interceptors}; +use crate::async_impl::request::Message; use crate::error::HttpClientError; use crate::runtime::timeout; #[cfg(feature = "__c_openssl")] @@ -64,6 +68,7 @@ use crate::Retry; pub struct Client { inner: ConnPool, config: ClientConfig, + interceptors: Arc, } impl Client { @@ -113,6 +118,7 @@ impl Client { Self { inner: ConnPool::new(HttpConfig::default(), connector), config: ClientConfig::default(), + interceptors: Arc::new(IdleInterceptor), } } @@ -137,11 +143,14 @@ impl Client { let mut retries = self.config.retry.times().unwrap_or(0); loop { let response = self.send_request(&mut request).await; - // Only bodies which are reusable can be retried. - if response.is_ok() || retries == 0 || !request.body_mut().reuse() { - return response; + if let Err(ref err) = response { + if retries > 0 && request.body_mut().reuse() { + self.interceptors.intercept_retry(err)?; + retries -= 1; + continue; + } } - retries -= 1; + return response; } } } @@ -178,10 +187,14 @@ impl Client { conn: Conn, request: &mut Request, ) -> Result { + let message = Message { + request, + interceptor: Arc::clone(&self.interceptors), + }; if let Some(timeout) = self.config.request_timeout.inner() { - TimeoutFuture::new(conn::request(conn, request), timeout).await + TimeoutFuture::new(conn::request(conn, message), timeout).await } else { - conn::request(conn, request).await + conn::request(conn, message).await } } @@ -204,9 +217,14 @@ impl Client { if !request.body_mut().reuse() { *request.body_mut() = Body::empty(); } + self.interceptors.intercept_redirect_request(request)?; response = self.send_unformatted_request(request).await?; + self.interceptors.intercept_redirect_response(&response)?; + } + Trigger::Stop => { + self.interceptors.intercept_response(&response)?; + return Ok(response); } - Trigger::Stop => return Ok(response), } } } @@ -237,6 +255,8 @@ pub struct ClientBuilder { /// Options and flags that is related to `Proxy`. proxies: Proxies, + interceptors: Arc, + /// Options and flags that is related to `TLS`. #[cfg(feature = "__tls")] tls: crate::util::TlsConfigBuilder, @@ -257,7 +277,7 @@ impl ClientBuilder { http: HttpConfig::default(), client: ClientConfig::default(), proxies: Proxies::default(), - + interceptors: Arc::new(IdleInterceptor), #[cfg(feature = "__tls")] tls: crate::util::TlsConfig::builder(), } @@ -366,6 +386,28 @@ impl ClientBuilder { self } + /// Adds a `Interceptor` to the `Client`. + /// + /// # Examples + /// + /// ``` + /// # use ylong_http_client::async_impl::{ClientBuilder, Interceptor}; + /// # use ylong_http_client::HttpClientError; + /// + /// # fn add_interceptor(interceptor: T) + /// # where T: Interceptor + Sync + Send + 'static, + /// # { + /// let builder = ClientBuilder::new().interceptor(interceptor); + /// # } + /// ``` + pub fn interceptor(mut self, interceptors: T) -> Self + where + T: Interceptor + Sync + Send + 'static, + { + self.interceptors = Arc::new(interceptors); + self + } + /// Constructs a `Client` based on the given settings. /// /// # Examples @@ -387,6 +429,7 @@ impl ClientBuilder { Ok(Client { inner: ConnPool::new(self.http, connector), config: self.client, + interceptors: self.interceptors, }) } } @@ -697,8 +740,6 @@ impl ClientBuilder { /// let builder = ClientBuilder::new().cert_verifier(verifier); /// ``` pub fn cert_verifier(mut self, verifier: T) -> Self { - use std::sync::Arc; - use crate::util::config::tls::DefaultCertVerifier; self.tls = self @@ -716,7 +757,6 @@ impl Default for ClientBuilder { #[cfg(test)] mod ut_async_impl_client { - #[cfg(feature = "ylong_base")] use ylong_runtime::io::AsyncWriteExt; @@ -738,9 +778,12 @@ mod ut_async_impl_client { #[cfg(feature = "ylong_base")] async fn client_request_redirect() { + use std::sync::Arc; + use ylong_http::h1::ResponseDecoder; use ylong_http::response::Response as HttpResponse; + use crate::async_impl::interceptor::IdleInterceptor; use crate::async_impl::{ClientBuilder, HttpBody}; use crate::util::normalizer::BodyLength; use crate::util::Redirect; @@ -751,8 +794,13 @@ mod ut_async_impl_client { let box_stream = Box::new("hello world".as_bytes()); let content_bytes = ""; - let until_close = - HttpBody::new(BodyLength::UntilClose, box_stream, content_bytes.as_bytes()).unwrap(); + let until_close = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::UntilClose, + box_stream, + content_bytes.as_bytes(), + ) + .unwrap(); let response = HttpResponse::from_raw_parts(result.0, until_close); let response = Response::new(response); let mut request = Request::builder() diff --git a/ylong_http_client/src/async_impl/conn/http1.rs b/ylong_http_client/src/async_impl/conn/http1.rs index 277f7be7d52eb51b824c25c5f0e0c14ece422457..fd74173dca8853c006883f0ef7f3ad783b743d51 100644 --- a/ylong_http_client/src/async_impl/conn/http1.rs +++ b/ylong_http_client/src/async_impl/conn/http1.rs @@ -22,7 +22,8 @@ use ylong_http::version::Version; use super::StreamData; use crate::async_impl::connector::ConnInfo; -use crate::async_impl::{HttpBody, Request, Response}; +use crate::async_impl::request::Message; +use crate::async_impl::{HttpBody, Response}; use crate::error::HttpClientError; use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use crate::util::dispatcher::http1::Http1Conn; @@ -32,22 +33,27 @@ const TEMP_BUF_SIZE: usize = 16 * 1024; pub(crate) async fn request( mut conn: Http1Conn, - request: &mut Request, + message: Message<'_>, ) -> Result where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, { + message + .interceptor + .intercept_connection(conn.raw_mut().conn_detail())?; + message.interceptor.intercept_request(message.request)?; let mut buf = vec![0u8; TEMP_BUF_SIZE]; // Encodes and sends Request-line and Headers(non-body fields). - let mut part_encoder = RequestEncoder::new(request.part().clone()); - if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) { + let mut part_encoder = RequestEncoder::new(message.request.part().clone()); + if conn.raw_mut().is_proxy() && message.request.uri().scheme() == Some(&Scheme::HTTP) { part_encoder.absolute_uri(true); } loop { match part_encoder.encode(&mut buf[..]) { Ok(0) => break, Ok(written) => { + message.interceptor.intercept_input(&buf[..written])?; // RequestEncoder writes `buf` as much as possible. if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await { conn.shutdown(); @@ -61,7 +67,8 @@ where } } - let content_length = request + let content_length = message + .request .part() .headers .get("Content-Length") @@ -69,7 +76,8 @@ where .and_then(|v| v.parse::().ok()) .is_some(); - let transfer_encoding = request + let transfer_encoding = message + .request .part() .headers .get("Transfer-Encoding") @@ -77,7 +85,7 @@ where .map(|v| v.contains("chunked")) .unwrap_or(false); - let body = request.body_mut(); + let body = message.request.body_mut(); match (content_length, transfer_encoding) { (_, true) => { @@ -110,6 +118,7 @@ where } }; + message.interceptor.intercept_output(&buf[..size])?; match decoder.decode(&buf[..size]) { Ok(None) => {} Ok(Some((part, rem))) => break (part, rem), @@ -152,7 +161,7 @@ where } } - let length = match BodyLengthParser::new(request.method(), &part).parse() { + let length = match BodyLengthParser::new(message.request.method(), &part).parse() { Ok(length) => length, Err(e) => { conn.shutdown(); @@ -160,7 +169,7 @@ where } }; - let body = HttpBody::new(length, Box::new(conn), pre)?; + let body = HttpBody::new(message.interceptor, length, Box::new(conn), pre)?; Ok(Response::new( ylong_http::response::Response::from_raw_parts(part, body), )) diff --git a/ylong_http_client/src/async_impl/conn/http2.rs b/ylong_http_client/src/async_impl/conn/http2.rs index 148d7c851b3c47b1559dcad8d616157b69fbafb6..3feb846ce14ea5168e5c799daca584dfe217519e 100644 --- a/ylong_http_client/src/async_impl/conn/http2.rs +++ b/ylong_http_client/src/async_impl/conn/http2.rs @@ -28,6 +28,7 @@ use ylong_http::response::{Response, ResponsePart}; use crate::async_impl::client::Retryable; use crate::async_impl::conn::HttpBody; +use crate::async_impl::request::Message; use crate::async_impl::StreamData; use crate::error::{ErrorKind, HttpClientError}; use crate::runtime::{AsyncRead, AsyncWrite, ReadBuf}; @@ -37,15 +38,15 @@ const UNUSED_FLAG: u8 = 0x0; pub(crate) async fn request( mut conn: Http2Conn, - request: &mut Request, + message: Message, retryable: &mut Retryable, ) -> Result, HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, { - let part = request.part().clone(); - let body = request.body_mut(); + let part = message.request.part().clone(); + let body = message.request.body_mut(); // TODO Due to the reason of the Body structure, the use of the trailer is not // implemented here for the time being, and it needs to be completed after the @@ -78,10 +79,11 @@ where frame_2_response(conn, frame, retryable) } -fn frame_2_response( +fn frame_2_response( conn: Http2Conn, headers_frame: Frame, retryable: &mut Retryable, + message: Message, ) -> Result, HttpClientError> where S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, @@ -147,7 +149,7 @@ where Some(0) => HttpBody::empty(), Some(size) => { let text_io = TextIo::new(conn); - HttpBody::text(size, &[0u8; 0], Box::new(text_io)) + HttpBody::text(size, &[0u8; 0], Box::new(text_io), message.interceptor) } } } diff --git a/ylong_http_client/src/async_impl/conn/mod.rs b/ylong_http_client/src/async_impl/conn/mod.rs index 6345d8f1c36af31d7cf04524b5894d9c05f62037..1bafb344ee1bc1b24c74e8b4a3e4fc43c7b5e6db 100644 --- a/ylong_http_client/src/async_impl/conn/mod.rs +++ b/ylong_http_client/src/async_impl/conn/mod.rs @@ -18,7 +18,8 @@ mod http1; mod http2; use crate::async_impl::connector::ConnInfo; -use crate::async_impl::{Request, Response}; +use crate::async_impl::request::Message; +use crate::async_impl::Response; use crate::error::HttpClientError; use crate::runtime::{AsyncRead, AsyncWrite}; use crate::util::dispatcher::Conn; @@ -32,16 +33,16 @@ pub(crate) trait StreamData: AsyncRead { pub(crate) async fn request( conn: Conn, - request: &mut Request, + message: Message<'_>, ) -> Result where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, { match conn { #[cfg(feature = "http1_1")] - Conn::Http1(http1) => http1::request(http1, request).await, + Conn::Http1(http1) => http1::request(http1, message).await, #[cfg(feature = "http2")] - Conn::Http2(http2) => http2::request(http2, request).await, + Conn::Http2(http2) => http2::request(http2, message).await, } } diff --git a/ylong_http_client/src/async_impl/connector/mod.rs b/ylong_http_client/src/async_impl/connector/mod.rs index 93bcf5d47753d52dba004e7867cda90ccb5a1c6d..6f1cbc8c413e8d2a0371b4b0567ba7f49885bd70 100644 --- a/ylong_http_client/src/async_impl/connector/mod.rs +++ b/ylong_http_client/src/async_impl/connector/mod.rs @@ -75,6 +75,7 @@ mod no_tls { use super::{tcp_stream, Connector, HttpConnector}; use crate::async_impl::connector::stream::HttpStream; + use crate::async_impl::interceptor::{ConnDetail, ConnProtocol}; use crate::runtime::TcpStream; impl Connector for HttpConnector { @@ -92,9 +93,17 @@ mod no_tls { is_proxy = true; } Box::pin(async move { - tcp_stream(&addr) - .await - .map(|stream| HttpStream::new(stream, is_proxy)) + let stream = tcp_stream(&addr).await?; + let local = stream.local_addr()?; + let peer = stream.peer_addr()?; + let detail = ConnDetail { + protocol: ConnProtocol::Tcp, + local, + peer, + addr, + proxy: is_proxy, + }; + Ok(HttpStream::new(stream, detail)) }) } } @@ -112,6 +121,7 @@ mod tls { use super::{tcp_stream, Connector, HttpConnector}; use crate::async_impl::connector::stream::HttpStream; + use crate::async_impl::interceptor::{ConnDetail, ConnProtocol}; use crate::async_impl::ssl_stream::{AsyncSslStream, MixStream}; use crate::runtime::{AsyncReadExt, AsyncWriteExt, TcpStream}; @@ -147,16 +157,25 @@ mod tls { match *uri.scheme().unwrap() { Scheme::HTTP => Box::pin(async move { - Ok(HttpStream::new( - MixStream::Http(tcp_stream(&addr).await?), - is_proxy, - )) + let stream = tcp_stream(&addr).await?; + let local = stream.local_addr()?; + let peer = stream.peer_addr()?; + let detail = ConnDetail { + protocol: ConnProtocol::Tcp, + local, + peer, + addr, + proxy: is_proxy, + }; + + Ok(HttpStream::new(MixStream::Http(stream), detail)) }), Scheme::HTTPS => { let config = self.config.tls.clone(); Box::pin(async move { let mut tcp = tcp_stream(&addr).await?; - + let local = tcp.local_addr()?; + let peer = tcp.peer_addr()?; if is_proxy { tcp = tunnel(tcp, host, port, auth).await?; }; @@ -166,12 +185,19 @@ mod tls { .ssl_new(&host_name) .and_then(|ssl| AsyncSslStream::new(ssl.into_inner(), tcp, pinned_key)) .map_err(|e| Error::new(ErrorKind::Other, e))?; + let detail = ConnDetail { + protocol: ConnProtocol::Tcp, + local, + peer, + addr, + proxy: is_proxy, + }; Pin::new(&mut stream) .connect() .await .map_err(|e| Error::new(ErrorKind::Other, e))?; - Ok(HttpStream::new(MixStream::Https(stream), is_proxy)) + Ok(HttpStream::new(MixStream::Https(stream), detail)) }) } } diff --git a/ylong_http_client/src/async_impl/connector/stream.rs b/ylong_http_client/src/async_impl/connector/stream.rs index 0aed4192700abddc0e59708a0f67f77423f613b3..23989442770e456a9fbfec294ac17d14eac48c83 100644 --- a/ylong_http_client/src/async_impl/connector/stream.rs +++ b/ylong_http_client/src/async_impl/connector/stream.rs @@ -11,23 +11,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! `ConnInfo` trait and `HttpStream` implementation. +//! `ConnDetail` trait and `HttpStream` implementation. use std::pin::Pin; use std::task::{Context, Poll}; +use crate::async_impl::interceptor::ConnDetail; use crate::runtime::{AsyncRead, AsyncWrite, ReadBuf}; -/// `ConnInfo` trait, which is used to obtain information about the current +/// `ConnDetail` trait, which is used to obtain information about the current /// connection. pub trait ConnInfo { /// Whether the current connection is a proxy. fn is_proxy(&self) -> bool; + + fn conn_detail(&self) -> ConnDetail; } /// A connection wrapper containing io and io information. pub struct HttpStream { - is_proxy: bool, + detail: ConnDetail, stream: T, } @@ -69,16 +72,17 @@ where impl ConnInfo for HttpStream { fn is_proxy(&self) -> bool { - self.is_proxy + self.detail.proxy + } + + fn conn_detail(&self) -> ConnDetail { + self.detail.clone() } } impl HttpStream { /// HttpStream constructor. - pub fn new(io: T, is_proxy: bool) -> HttpStream { - HttpStream { - is_proxy, - stream: io, - } + pub fn new(io: T, detail: ConnDetail) -> HttpStream { + HttpStream { detail, stream: io } } } diff --git a/ylong_http_client/src/async_impl/downloader/mod.rs b/ylong_http_client/src/async_impl/downloader/mod.rs index 7bb4193abf89edc62f663b49500a910834f0a37e..6a7e3969887beb448396af70959e69a75ab112b0 100644 --- a/ylong_http_client/src/async_impl/downloader/mod.rs +++ b/ylong_http_client/src/async_impl/downloader/mod.rs @@ -253,10 +253,13 @@ impl Default for DownloadConfig { #[cfg(all(test, feature = "ylong_base"))] mod ut_downloader { + use std::sync::Arc; + use ylong_http::h1::ResponseDecoder; use ylong_http::response::Response; use crate::async_impl::conn::StreamData; + use crate::async_impl::interceptor::IdleInterceptor; use crate::async_impl::{Downloader, HttpBody, Response as adpater_resp}; use crate::util::normalizer::BodyLength; @@ -291,8 +294,13 @@ mod ut_downloader { 000; message = last\r\n\ \r\n\ "; - let chunk = - HttpBody::new(BodyLength::Chunk, box_stream, chunk_body_bytes.as_bytes()).unwrap(); + let chunk = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Chunk, + box_stream, + chunk_body_bytes.as_bytes(), + ) + .unwrap(); let mut decoder = ResponseDecoder::new(); let result = decoder.decode(response_str).unwrap().unwrap(); let response = Response::from_raw_parts(result.0, chunk); diff --git a/ylong_http_client/src/async_impl/http_body.rs b/ylong_http_client/src/async_impl/http_body.rs index e2e3f2e5cd5ea5bb3973b72b48a9f625c3710be0..998c4b9afcc57318fb7de3018a1ef25936b3fe06 100644 --- a/ylong_http_client/src/async_impl/http_body.rs +++ b/ylong_http_client/src/async_impl/http_body.rs @@ -15,6 +15,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use std::future::Future; use std::io::{Cursor, Read}; +use std::sync::Arc; use ylong_http::body::async_impl::Body; use ylong_http::body::TextBodyDecoder; @@ -23,6 +24,7 @@ use ylong_http::body::{ChunkBodyDecoder, ChunkState}; use ylong_http::headers::Headers; use super::conn::StreamData; +use crate::async_impl::interceptor::Interceptors; use crate::error::{ErrorKind, HttpClientError}; use crate::runtime::{AsyncRead, ReadBuf, Sleep}; use crate::util::normalizer::BodyLength; @@ -69,6 +71,7 @@ type BoxStreamData = Box; impl HttpBody { pub(crate) fn new( + interceptors: Arc, body_length: BodyLength, io: BoxStreamData, pre: &[u8], @@ -82,11 +85,11 @@ impl HttpBody { } Kind::Empty } - BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io)), - BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io)), + BodyLength::Length(len) => Kind::Text(Text::new(len, pre, io, interceptors)), + BodyLength::UntilClose => Kind::UntilClose(UntilClose::new(pre, io, interceptors)), #[cfg(feature = "http1_1")] - BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io)), + BodyLength::Chunk => Kind::Chunk(Chunk::new(pre, io, interceptors)), }; Ok(Self { kind, sleep: None }) } @@ -100,9 +103,14 @@ impl HttpBody { } #[cfg(feature = "http2")] - pub(crate) fn text(len: usize, pre: &[u8], io: BoxStreamData) -> Self { + pub(crate) fn text( + len: usize, + pre: &[u8], + io: BoxStreamData, + interceptors: Arc, + ) -> Self { Self { - kind: Kind::Text(Text::new(len, pre, io)), + kind: Kind::Text(Text::new(len, pre, io, interceptors)), sleep: None, } } @@ -199,13 +207,15 @@ enum Kind { } struct UntilClose { + interceptors: Arc, pre: Option>>, io: Option, } impl UntilClose { - pub(crate) fn new(pre: &[u8], io: BoxStreamData) -> Self { + pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc) -> Self { Self { + interceptors, pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), io: Some(io), } @@ -242,6 +252,8 @@ impl UntilClose { if filled == 0 { io.shutdown(); } else { + self.interceptors + .intercept_output(&buf[read..(read + filled)])?; self.io = Some(io); } read += filled; @@ -267,14 +279,21 @@ impl UntilClose { } struct Text { + interceptors: Arc, decoder: TextBodyDecoder, pre: Option>>, io: Option, } impl Text { - pub(crate) fn new(len: usize, pre: &[u8], io: BoxStreamData) -> Self { + pub(crate) fn new( + len: usize, + pre: &[u8], + io: BoxStreamData, + interceptors: Arc, + ) -> Self { Self { + interceptors, decoder: TextBodyDecoder::new(len), pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), io: Some(io), @@ -336,6 +355,7 @@ impl Text { )); } let (text, rem) = self.decoder.decode(read_buf.filled()); + self.interceptors.intercept_output(read_buf.filled())?; read += filled; // Contains redundant `rem`, return error. match (text.is_complete(), rem.is_empty()) { @@ -369,6 +389,7 @@ impl Text { #[cfg(feature = "http1_1")] struct Chunk { + interceptors: Arc, decoder: ChunkBodyDecoder, pre: Option>>, io: Option, @@ -376,8 +397,9 @@ struct Chunk { #[cfg(feature = "http1_1")] impl Chunk { - pub(crate) fn new(pre: &[u8], io: BoxStreamData) -> Self { + pub(crate) fn new(pre: &[u8], io: BoxStreamData, interceptors: Arc) -> Self { Self { + interceptors, decoder: ChunkBodyDecoder::new().contains_trailer(true), pre: (!pre.is_empty()).then_some(Cursor::new(pre.to_vec())), io: Some(io), @@ -429,6 +451,7 @@ impl Chunk { return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); } let (size, flag) = self.merge_chunks(read_buf.filled_mut())?; + self.interceptors.intercept_output(read_buf.filled_mut())?; read += size; if flag { // Return if we find a 0-sized chunk. @@ -514,8 +537,11 @@ impl Chunk { #[cfg(feature = "ylong_base")] #[cfg(test)] mod ut_async_http_body { + use std::sync::Arc; + use ylong_http::body::async_impl; + use crate::async_impl::interceptor::IdleInterceptor; use crate::async_impl::HttpBody; use crate::util::normalizer::BodyLength; use crate::ErrorKind; @@ -545,8 +571,13 @@ mod ut_async_http_body { 000; message = last\r\n\ accept:text/html\r\n\r\n\ "; - let mut chunk = - HttpBody::new(BodyLength::Chunk, box_stream, chunk_body_bytes.as_bytes()).unwrap(); + let mut chunk = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Chunk, + box_stream, + chunk_body_bytes.as_bytes(), + ) + .unwrap(); let res = async_impl::Body::trailer(&mut chunk) .await .unwrap() @@ -565,6 +596,7 @@ mod ut_async_http_body { "; let mut chunk = HttpBody::new( + Arc::new(IdleInterceptor), BodyLength::Chunk, box_stream, chunk_body_no_trailer_bytes.as_bytes(), @@ -597,8 +629,13 @@ mod ut_async_http_body { 000; message = last\r\n\ Expires: Wed, 21 Oct 2015 07:27:00 GMT \r\n\r\n\ "; - let mut chunk = - HttpBody::new(BodyLength::Chunk, box_stream, chunk_body_bytes.as_bytes()).unwrap(); + let mut chunk = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Chunk, + box_stream, + chunk_body_bytes.as_bytes(), + ) + .unwrap(); let res = async_impl::Body::trailer(&mut chunk) .await .unwrap() @@ -636,8 +673,13 @@ mod ut_async_http_body { .as_bytes(), ); let chunk_body_bytes = ""; - let mut chunk = - HttpBody::new(BodyLength::Chunk, box_stream, chunk_body_bytes.as_bytes()).unwrap(); + let mut chunk = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Chunk, + box_stream, + chunk_body_bytes.as_bytes(), + ) + .unwrap(); let mut buf = [0u8; 32]; // Read body part @@ -654,6 +696,7 @@ mod ut_async_http_body { "; let mut chunk = HttpBody::new( + Arc::new(IdleInterceptor), BodyLength::Chunk, box_stream, chunk_body_no_trailer_bytes.as_bytes(), @@ -686,7 +729,12 @@ mod ut_async_http_body { let box_stream = Box::new("".as_bytes()); let content_bytes = "hello"; - match HttpBody::new(BodyLength::Empty, box_stream, content_bytes.as_bytes()) { + match HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Empty, + box_stream, + content_bytes.as_bytes(), + ) { Ok(_) => (), Err(e) => assert_eq!(e.error_kind(), ErrorKind::Request), } @@ -710,8 +758,13 @@ mod ut_async_http_body { let box_stream = Box::new("hello world".as_bytes()); let content_bytes = ""; - let mut text = - HttpBody::new(BodyLength::Length(11), box_stream, content_bytes.as_bytes()).unwrap(); + let mut text = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Length(11), + box_stream, + content_bytes.as_bytes(), + ) + .unwrap(); let mut buf = [0u8; 5]; // Read body part @@ -727,8 +780,13 @@ mod ut_async_http_body { let box_stream = Box::new("".as_bytes()); let content_bytes = "hello"; - let mut text = - HttpBody::new(BodyLength::Length(5), box_stream, content_bytes.as_bytes()).unwrap(); + let mut text = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Length(5), + box_stream, + content_bytes.as_bytes(), + ) + .unwrap(); let mut buf = [0u8; 32]; // Read body part @@ -756,8 +814,13 @@ mod ut_async_http_body { let box_stream = Box::new("hello world".as_bytes()); let content_bytes = ""; - let mut until_close = - HttpBody::new(BodyLength::UntilClose, box_stream, content_bytes.as_bytes()).unwrap(); + let mut until_close = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::UntilClose, + box_stream, + content_bytes.as_bytes(), + ) + .unwrap(); let mut buf = [0u8; 5]; // Read body part @@ -777,8 +840,13 @@ mod ut_async_http_body { let box_stream = Box::new("".as_bytes()); let content_bytes = "hello"; - let mut until_close = - HttpBody::new(BodyLength::UntilClose, box_stream, content_bytes.as_bytes()).unwrap(); + let mut until_close = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::UntilClose, + box_stream, + content_bytes.as_bytes(), + ) + .unwrap(); let mut buf = [0u8; 5]; // Read body part diff --git a/ylong_http_client/src/async_impl/interceptor/mod.rs b/ylong_http_client/src/async_impl/interceptor/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..27edf37f7450361a36072fb975339510ba3e6d7f --- /dev/null +++ b/ylong_http_client/src/async_impl/interceptor/mod.rs @@ -0,0 +1,131 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Http network interceptor. + +use std::net::SocketAddr; + +use ylong_http::response::Response as HttpResp; + +use crate::async_impl::{HttpBody, Request, Response}; +use crate::HttpClientError; + +pub(crate) type Interceptors = dyn Interceptor + Sync + Send + 'static; + +/// Transport layer protocol type. +#[derive(Clone)] +pub enum ConnProtocol { + /// Tcp protocol. + Tcp, + /// Udp Protocol. + Udp, +} + +/// Tcp connection information. +#[derive(Clone)] +pub struct ConnDetail { + /// Transport layer protocol type. + pub(crate) protocol: ConnProtocol, + /// local socket address. + pub(crate) local: SocketAddr, + /// peer socket address. + pub(crate) peer: SocketAddr, + /// peer domain information. + pub(crate) addr: String, + /// Whether to use proxy. + pub(crate) proxy: bool, +} + +impl ConnDetail { + /// Gets the transport layer protocol for the connection. + pub fn protocol(&self) -> &ConnProtocol { + &self.protocol + } + + /// Gets the local socket address of the connection. + pub fn local(&self) -> SocketAddr { + self.local + } + + /// Gets the peer socket address of the connection. + pub fn peer(&self) -> SocketAddr { + self.peer + } + + /// Gets the peer domain address of the connection. + pub fn addr(&self) -> &str { + &self.addr + } + + /// Whether to use proxy. + pub fn proxy(&self) -> bool { + self.proxy + } +} + +/// Network interceptor. +/// +/// Provides intercepting behavior at various stages of http message passing. +pub trait Interceptor { + /// Intercepts the created transport layer protocol. + // TODO add cache and response interceptor. + // Is it necessary to add a response interceptor? + // Does the input and output interceptor need to be added to http2 or http3 + // encoded packets? + fn intercept_connection(&self, _info: ConnDetail) -> Result<(), HttpClientError> { + Ok(()) + } + + /// Intercepts the input of transport layer io. + fn intercept_input(&self, _bytes: &[u8]) -> Result<(), HttpClientError> { + Ok(()) + } + + /// Intercepts the output of transport layer io. + fn intercept_output(&self, _bytes: &[u8]) -> Result<(), HttpClientError> { + Ok(()) + } + + /// Intercepts the Request that is eventually transmitted to the peer end. + fn intercept_request(&self, _request: &Request) -> Result<(), HttpClientError> { + Ok(()) + } + + /// Intercepts the response that is eventually returned. + fn intercept_response(&self, _request: &Response) -> Result<(), HttpClientError> { + Ok(()) + } + + /// Intercepts the error cause of the retry. + fn intercept_retry(&self, _error: &HttpClientError) -> Result<(), HttpClientError> { + Ok(()) + } + + /// Intercepts the redirect request. + fn intercept_redirect_request(&self, _request: &Request) -> Result<(), HttpClientError> { + Ok(()) + } + + /// Intercepts the response returned by the redirect + fn intercept_redirect_response( + &self, + _response: &HttpResp, + ) -> Result<(), HttpClientError> { + Ok(()) + } +} + +/// The default Interceptor does not do any intercepting. +pub(crate) struct IdleInterceptor; + +impl Interceptor for IdleInterceptor {} diff --git a/ylong_http_client/src/async_impl/mod.rs b/ylong_http_client/src/async_impl/mod.rs index 32fd32420853af6373211e6ec733cf19a08b4dcc..e0baee764749452338247a5f11c4895ede136348 100644 --- a/ylong_http_client/src/async_impl/mod.rs +++ b/ylong_http_client/src/async_impl/mod.rs @@ -27,6 +27,7 @@ mod client; mod connector; mod downloader; mod http_body; +mod interceptor; mod request; mod response; mod timeout; @@ -42,6 +43,7 @@ pub use client::ClientBuilder; pub use connector::{Connector, HttpConnector}; pub use downloader::{DownloadOperator, Downloader, DownloaderBuilder}; pub use http_body::HttpBody; +pub use interceptor::{ConnDetail, ConnProtocol, Interceptor}; pub use request::{Body, Request, RequestBuilder}; pub use response::Response; pub use uploader::{UploadOperator, Uploader, UploaderBuilder}; diff --git a/ylong_http_client/src/async_impl/request.rs b/ylong_http_client/src/async_impl/request.rs index 5b730eb49dd7699df687450251572cd3f9a22c25..c6420209bc265416fbf1fe0ca98d18c2c9c10c21 100644 --- a/ylong_http_client/src/async_impl/request.rs +++ b/ylong_http_client/src/async_impl/request.rs @@ -17,10 +17,12 @@ use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; use std::io::Cursor; +use std::sync::Arc; use ylong_http::body::MultiPartBase; use ylong_http::request::{Request as Req, RequestBuilder as ReqBuilder}; +use crate::async_impl::interceptor::Interceptors; use crate::error::{ErrorKind, HttpClientError}; use crate::runtime::{AsyncRead, ReadBuf}; @@ -370,6 +372,11 @@ impl AsyncRead for Body { } } +pub(crate) struct Message<'a> { + pub(crate) request: &'a mut Request, + pub(crate) interceptor: Arc, +} + #[cfg(feature = "ylong_base")] fn poll_read_cursor( cursor: &mut Cursor>, diff --git a/ylong_http_client/src/async_impl/timeout.rs b/ylong_http_client/src/async_impl/timeout.rs index 30d54510e26bfdd2e74b043264e4c9763957a612..a555c390518186dc79eb4cc0ef8ae6ef321fd1fa 100644 --- a/ylong_http_client/src/async_impl/timeout.rs +++ b/ylong_http_client/src/async_impl/timeout.rs @@ -61,10 +61,13 @@ where #[cfg(all(test, feature = "ylong_base"))] mod ut_timeout { + use std::sync::Arc; + use ylong_http::response::status::StatusCode; use ylong_http::response::{Response, ResponsePart}; use ylong_http::version::Version; + use crate::async_impl::interceptor::IdleInterceptor; use crate::async_impl::timeout::TimeoutFuture; use crate::async_impl::HttpBody; use crate::util::normalizer::BodyLength; @@ -84,7 +87,13 @@ mod ut_timeout { status: StatusCode::OK, headers: Default::default(), }; - let body = HttpBody::new(BodyLength::Empty, Box::new([].as_slice()), &[]).unwrap(); + let body = HttpBody::new( + Arc::new(IdleInterceptor), + BodyLength::Empty, + Box::new([].as_slice()), + &[], + ) + .unwrap(); Ok(crate::async_impl::Response::new(Response::from_raw_parts( part, body, )))