From b02dde2499f068278715a2aa7a55ddef46332912 Mon Sep 17 00:00:00 2001 From: Tiga Ultraman Date: Wed, 27 Sep 2023 15:12:45 +0800 Subject: [PATCH] add costom dns resolver Signed-off-by: Tiga Ultraman --- ylong_http_client/src/async_impl/client.rs | 42 ++++- ylong_http_client/src/async_impl/connector.rs | 68 ++++++-- ylong_http_client/src/async_impl/dns/mod.rs | 25 +++ .../src/async_impl/dns/resolver.rs | 145 ++++++++++++++++++ ylong_http_client/src/async_impl/mod.rs | 3 + ylong_http_client/src/lib.rs | 3 + .../tests/sdv_async_custom_dns_resolver.rs | 84 ++++++++++ 7 files changed, 356 insertions(+), 14 deletions(-) create mode 100644 ylong_http_client/src/async_impl/dns/mod.rs create mode 100644 ylong_http_client/src/async_impl/dns/resolver.rs create mode 100644 ylong_http_client/tests/sdv_async_custom_dns_resolver.rs diff --git a/ylong_http_client/src/async_impl/client.rs b/ylong_http_client/src/async_impl/client.rs index 6b4f6e4..7b2c6f9 100644 --- a/ylong_http_client/src/async_impl/client.rs +++ b/ylong_http_client/src/async_impl/client.rs @@ -11,10 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use ylong_http::body::{ChunkBody, TextBody}; use ylong_http::response::Response; use super::{conn, Body, ConnPool, Connector, HttpBody, HttpConnector}; +use crate::async_impl::dns::{Resolver, DefaultDnsResolver}; use crate::async_impl::timeout::TimeoutFuture; use crate::util::normalizer::{RequestFormatter, UriFormatter}; use crate::util::proxy::Proxies; @@ -73,6 +76,21 @@ impl Client { Self::with_connector(HttpConnector::default()) } + /// Creates a new, default `AsyncClient` with a given dns resolver. + /// # Examples + /// + /// ``` + /// use ylong_http_client::async_impl::{Client, DefaultDnsResolver}; + /// + /// let client = Client::with_dns_resolver(DefaultDnsResolver::default()); + /// ``` + pub fn with_dns_resolver(resolver: R) -> Self + where + R: Resolver, + { + Self::with_connector(HttpConnector::with_dns_resolver(resolver)) + } + /// Creates a new, default [`async_impl::ClientBuilder`]. /// /// [`async_impl::ClientBuilder`]: ClientBuilder @@ -318,6 +336,9 @@ pub struct ClientBuilder { /// Options and flags that is related to `Proxy`. proxies: Proxies, + /// Resolver to http DNS. + resolver: Arc, + /// Options and flags that is related to `TLS`. #[cfg(feature = "__tls")] tls: crate::util::TlsConfigBuilder, @@ -338,7 +359,7 @@ impl ClientBuilder { http: HttpConfig::default(), client: ClientConfig::default(), proxies: Proxies::default(), - + resolver: Arc::new(DefaultDnsResolver::default()), #[cfg(feature = "__tls")] tls: crate::util::TlsConfig::builder(), } @@ -459,6 +480,23 @@ impl ClientBuilder { self } + /// Adds a dns `Resolver` to the `Client`. + /// + /// # Example + /// + /// ``` + /// use ylong_http_client::async_impl::{ClientBuilder, DefaultDnsResolver}; + /// + /// let builder = ClientBuilder::new().dns_resolver(DefaultDnsResolver::default()); + /// ``` + pub fn dns_resolver(mut self, resolver: R) -> Self + where + R: Resolver, + { + self.resolver = Arc::new(resolver); + self + } + /// Constructs a `Client` based on the given settings. /// /// # Examples @@ -475,7 +513,7 @@ impl ClientBuilder { tls: self.tls.build()?, }; - let connector = HttpConnector::new(config); + let connector = HttpConnector::new(config, self.resolver); Ok(Client { inner: ConnPool::new(self.http.clone(), connector), diff --git a/ylong_http_client/src/async_impl/connector.rs b/ylong_http_client/src/async_impl/connector.rs index db23445..2b90a78 100644 --- a/ylong_http_client/src/async_impl/connector.rs +++ b/ylong_http_client/src/async_impl/connector.rs @@ -16,8 +16,10 @@ use core::future::Future; use std::error::Error; use std::io; -use std::net::ToSocketAddrs; +use std::net::SocketAddr; +use std::sync::Arc; +use crate::async_impl::dns::{Resolver, DefaultDnsResolver}; use crate::util::ConnectorConfig; use crate::{AsyncRead, AsyncWrite, TcpStream, Uri}; @@ -39,23 +41,41 @@ pub trait Connector { /// Connector for creating HTTP or HTTPS connections asynchronously. /// /// `HttpConnector` implements `async_impl::Connector` trait. -#[derive(Default)] pub struct HttpConnector { config: ConnectorConfig, + resolver: Arc, +} + +impl Default for HttpConnector { + fn default() -> Self { + Self { + config: Default::default(), + resolver: Arc::new(DefaultDnsResolver::default()), + } + } } impl HttpConnector { /// Creates a new `HttpConnector` with a `ConnectorConfig`. - pub(crate) fn new(config: ConnectorConfig) -> HttpConnector { - HttpConnector { config } + pub(crate) fn new(config: ConnectorConfig, resolver: Arc) -> Self { + Self { config, resolver } + } + + /// Creates a new `HttpConnector` with a given dns `Resolver`. + pub(crate) fn with_dns_resolver(resolver: R) -> Self + where + R: Resolver, + { + let resolver = Arc::new(resolver) as Arc; + Self { + config: Default::default(), + resolver, + } } } // TODO: Fix this function after `ylong_runtime::TcpStream` support set_nodelay. -async fn tcp_stream(addr: &str) -> io::Result { - // Here `addr` must contain a value if `to_socket_addrs` return `Ok`. - let addr = addr.to_socket_addrs()?.next().unwrap(); - +async fn tcp_stream(addr: SocketAddr) -> io::Result { #[cfg(feature = "tokio_base")] { TcpStream::connect(addr) @@ -74,7 +94,7 @@ async fn tcp_stream(addr: &str) -> io::Result { mod no_tls { use core::future::Future; use core::pin::Pin; - use std::io::Error; + use std::io::{Error, ErrorKind}; use super::{tcp_stream, Connector, HttpConnector}; use crate::{TcpStream, Uri}; @@ -94,7 +114,15 @@ mod no_tls { .map(|proxy| proxy.via_proxy(uri).authority().unwrap().to_string()) .unwrap_or(uri.authority().unwrap().to_string()); - Box::pin(async move { tcp_stream(&addr).await }) + let resolver = self.resolver.clone(); + + Box::pin(async move { + let addr_fut = resolver.resolve(&addr); + let mut socket_addr = addr_fut + .await + .map_err(|e| Error::new(ErrorKind::Interrupted, e))?; + tcp_stream(socket_addr.next().unwrap()).await + }) } } } @@ -142,12 +170,28 @@ mod tls { match *uri.scheme().unwrap() { Scheme::HTTP => { - Box::pin(async move { Ok(MixStream::Http(tcp_stream(&addr).await?)) }) + let resolver = self.resolver.clone(); + + Box::pin(async move { + let addr_fut = resolver.resolve(&addr); + let mut socket_addr = addr_fut + .await + .map_err(|e| Error::new(ErrorKind::Interrupted, e))?; + Ok(MixStream::Http( + tcp_stream(socket_addr.next().unwrap()).await?, + )) + }) } Scheme::HTTPS => { + let resolver = self.resolver.clone(); let config = self.config.tls.clone(); + Box::pin(async move { - let mut tcp = tcp_stream(&addr).await?; + let addr_fut = resolver.resolve(&addr); + let mut socket_addr = addr_fut + .await + .map_err(|e| Error::new(ErrorKind::Interrupted, e))?; + let mut tcp = tcp_stream(socket_addr.next().unwrap()).await?; if is_proxy { tcp = tunnel(tcp, host, port, auth).await?; diff --git a/ylong_http_client/src/async_impl/dns/mod.rs b/ylong_http_client/src/async_impl/dns/mod.rs new file mode 100644 index 0000000..92ed52c --- /dev/null +++ b/ylong_http_client/src/async_impl/dns/mod.rs @@ -0,0 +1,25 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! HTTP dns resolver module. +//! +//! This module defines the dns resolver trait accepted by http clients and +//! provides a default dns resolver implementation. +//! +//! - [`Resolver`]: The dns resolver trait, which users can implement to provide +//! a custom dns resolver. +//! +//! - [`DefaultDnsResolver`]: Default dns resolver. + +mod resolver; +pub use resolver::{Resolver, DefaultDnsResolver}; diff --git a/ylong_http_client/src/async_impl/dns/resolver.rs b/ylong_http_client/src/async_impl/dns/resolver.rs new file mode 100644 index 0000000..597aea6 --- /dev/null +++ b/ylong_http_client/src/async_impl/dns/resolver.rs @@ -0,0 +1,145 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! `Resolver` trait and `DefaultDnsResolver` implementation. + +use std::future::Future; +use std::io; +use std::io::Error; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::vec::IntoIter; + +use crate::JoinHandle; + +/// `SocketAddr` resolved by `Resolver`. +pub type Addrs = Box + Sync + Send>; +/// Possible errors that this resolver may generate when attempting to +/// resolve. +pub type StdError = Box; +/// Futures generated by this resolve when attempting to resolve an address. +pub type SocketFuture = Pin> + Sync + Send>>; + +/// `Resolver` trait used by `async_impl::connector::HttpConnector`. `Resolver` +/// provides asynchronous dns resolve interfaces. +pub trait Resolver: Send + Sync + 'static { + /// resolve domain to a `SocketAddr` `Future`. + fn resolve(&self, domain: &str) -> SocketFuture; +} + +/// `SocketAddr` resolved by `DefaultDnsResolver`. +pub struct ResolvedAddrs { + iter: IntoIter, +} + +impl Iterator for ResolvedAddrs { + type Item = SocketAddr; + + fn next(&mut self) -> Option { + self.iter.next() + } +} + +/// Futures generated by `DefaultDnsResolver`. +pub struct DefaultDnsFuture { + inner: JoinHandle>, +} + +impl Future for DefaultDnsFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.inner).poll(cx).map(|res| match res { + Ok(Ok(addrs)) => Ok(Box::new(addrs) as Addrs), + Ok(Err(err)) => Err(Box::new(err) as StdError), + Err(err) => Err(Box::new(Error::new(io::ErrorKind::Interrupted, err)) as StdError), + }) + } +} + +/// Default dns resolver used by the `Client`。 +#[derive(Default)] +pub struct DefaultDnsResolver {} + +impl Resolver for DefaultDnsResolver { + fn resolve(&self, domain: &str) -> SocketFuture { + let domain: Box = domain.into(); + let blocking = crate::spawn_blocking(move || { + (*domain) + .to_socket_addrs() + .map(|iter| ResolvedAddrs { iter }) + }); + Box::pin(DefaultDnsFuture { inner: blocking }) + } +} + +#[cfg(test)] +mod ut_dns_resolver { + use crate::async_impl::{Resolver, DefaultDnsResolver}; + + /// UT test cases for `FrameDecoder::decode`. + /// + /// # Brief + /// + /// Test a simple complete DATA frame. + /// 1. Creates a `FrameDecoder`. + /// 2. Calls its `FrameDecoder::decode` method. + /// 3. Checks the results. + #[test] + #[cfg(feature = "ylong_base")] + fn ut_resolver_std_dns_resolver_resolve() { + ylong_runtime::block_on(async move { + let resolver = DefaultDnsResolver {}; + let sockets = resolver + .resolve("www.baidu.com:80") + .await + .expect("dns resolver resolve domain occurs err"); + let mut size = 0; + for _socket in sockets.into_iter() { + size += 1; + } + assert!(size > 0); + }); + } + + /// UT test cases for `FrameDecoder::decode`. + /// + /// # Brief + /// + /// Test a simple complete DATA frame. + /// 1. Creates a `FrameDecoder`. + /// 2. Calls its `FrameDecoder::decode` method. + /// 3. Checks the results. + #[test] + #[cfg(feature = "tokio_base")] + fn ut_resolver_std_dns_resolver_resolve() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .expect("Build runtime failed."); + runtime.block_on(async move { + let resolver = DefaultDnsResolver {}; + let sockets = resolver + .resolve("www.baidu.com:80") + .await + .expect("dns resolver resolve domain occurs err"); + let mut size = 0; + for _socket in sockets.into_iter() { + size += 1; + } + assert!(size > 0); + }); + } +} diff --git a/ylong_http_client/src/async_impl/mod.rs b/ylong_http_client/src/async_impl/mod.rs index 9d12adb..9533d5f 100644 --- a/ylong_http_client/src/async_impl/mod.rs +++ b/ylong_http_client/src/async_impl/mod.rs @@ -52,4 +52,7 @@ pub type Client = client::Client; // TODO: Remove these later. mod adapter; +mod dns; + pub use adapter::{RequestBuilder, Response}; +pub use dns::{Resolver, DefaultDnsResolver}; diff --git a/ylong_http_client/src/lib.rs b/ylong_http_client/src/lib.rs index 1cad613..b843b67 100644 --- a/ylong_http_client/src/lib.rs +++ b/ylong_http_client/src/lib.rs @@ -64,6 +64,7 @@ pub(crate) use tokio::sync::{ pub(crate) use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, net::TcpStream, + task::{spawn_blocking, JoinHandle}, time::{sleep, timeout, Sleep}, }; #[cfg(all( @@ -80,5 +81,7 @@ pub(crate) use ylong_runtime::sync::{ pub(crate) use ylong_runtime::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}, net::TcpStream, + spawn_blocking, + task::JoinHandle, time::{sleep, timeout, Sleep}, }; diff --git a/ylong_http_client/tests/sdv_async_custom_dns_resolver.rs b/ylong_http_client/tests/sdv_async_custom_dns_resolver.rs new file mode 100644 index 0000000..92ca76e --- /dev/null +++ b/ylong_http_client/tests/sdv_async_custom_dns_resolver.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(all(feature = "async", feature = "http1_1", feature = "ylong_base"))] + +#[macro_use] +pub mod tcp_server; + +use ylong_http::body::async_impl::Body; +use ylong_http_client::async_impl::{Client, DefaultDnsResolver}; + +use crate::tcp_server::{format_header_str, TcpHandle}; + +/// SDV test cases for `async::Client`. +/// +/// # Brief +/// 1. Starts a tcp server with the ylong_runtime coroutine. +/// 2. Creates an async::Client with dns resolver set by user. +/// 3. The client sends a request message. +/// 4. Verifies the received request on the server. +/// 5. The server sends a response message. +/// 6. Verifies the received response on the client. +/// 7. Shuts down the server. +#[test] +fn sdv_client_custom_dns_resolver() { + let mut handles_vec = vec![]; + + start_tcp_server!( + ASYNC; + ServerNum: 1, + Handles: handles_vec, + Request: { + Method: "GET", + Path: "/data", + Header: "Content-Length", "6", + Body: "Hello!", + }, + Response: { + Status: 200, + Version: "HTTP/1.1", + Header: "Content-Length", "3", + Body: "Hi!", + }, + ); + + let handle = handles_vec.pop().expect("No more handles !"); + + let resolver = DefaultDnsResolver {}; + let client = Client::builder().dns_resolver(resolver).build().unwrap(); + + let shutdown_handle = ylong_runtime::spawn(async move { + async_client_assertions_on_tcp!( + ServerHandle: handle, + ClientRef: client, + Request: { + Method: "GET", + Path: "/data", + Header: "Content-Length", "6", + Body: "Hello!", + }, + Response: { + Status: 200, + Version: "HTTP/1.1", + Header: "Content-Length", "3", + Body: "Hi!", + }, + ); + handle + .server_shutdown + .recv() + .expect("server send order failed !"); + }); + ylong_runtime::block_on(shutdown_handle).expect("Runtime wait for server shutdown failed"); +} -- Gitee