diff --git a/ylong_http_client/src/async_impl/connector.rs b/ylong_http_client/src/async_impl/connector.rs index c395011b4eaacb87a13b11aa0c8e71fcfe209b98..ebcde1a6595ee7fd76e986a0906c3c02bcb248a6 100644 --- a/ylong_http_client/src/async_impl/connector.rs +++ b/ylong_http_client/src/async_impl/connector.rs @@ -13,92 +13,104 @@ * limitations under the License. */ +//! Asynchronous `Connector` trait and `HttpConnector` implementation. + use crate::util::ConnectorConfig; -use crate::{AsyncRead, AsyncWrite}; +use crate::{AsyncRead, AsyncWrite, TcpStream, Uri}; use core::future::Future; -use ylong_http::request::uri::Uri; +use std::error::Error; +use std::io; +use std::net::ToSocketAddrs; /// `Connector` trait used by `async_impl::Client`. `Connector` provides /// asynchronous connection establishment interfaces. -#[rustfmt::skip] pub trait Connector { - /// The type of stream that this connector produces. This must be an async stream that implements AsyncRead, AsyncWrite, and is also Send + Sync. + /// Streams that this connector produces. type Stream: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static; - /// The type of errors that this connector can produce when trying to create a stream. - type Error: Into>; - /// The future type returned by this connector when attempting to create a stream. + /// Possible errors that this connector may generate when attempting to connect. + type Error: Into>; + /// Futures generated by this connector when attempting to create a stream. type Future: Future> + Unpin + Sync + Send + 'static; /// Attempts to establish a connection. fn connect(&self, uri: &Uri) -> Self::Future; } -/// Connector for creating HTTP connections asynchronously. +/// Connector for creating HTTP or HTTPS connections asynchronously. /// /// `HttpConnector` implements `async_impl::Connector` trait. +#[derive(Default)] pub struct HttpConnector { config: ConnectorConfig, } impl HttpConnector { - /// Creates a new `HttpConnector`. + /// Creates a new `HttpConnector` with a `ConnectorConfig`. pub(crate) fn new(config: ConnectorConfig) -> HttpConnector { HttpConnector { config } } } -impl Default for HttpConnector { - fn default() -> Self { - Self::new(ConnectorConfig::default()) +// TODO: Fix this function after `ylong_runtime::TcpStream` support set_nodelay. +async fn tcp_stream(addr: &str) -> io::Result { + // Here `addr` must contain a value if `to_socket_addrs` return `Ok`. + let addr = addr.to_socket_addrs()?.next().unwrap(); + + #[cfg(feature = "tokio_base")] + { + TcpStream::connect(addr) + .await + .and_then(|stream| match stream.set_nodelay(true) { + Ok(()) => Ok(stream), + Err(e) => Err(e), + }) } + + #[cfg(feature = "ylong_base")] + TcpStream::connect(addr).await } #[cfg(not(feature = "__tls"))] -pub(crate) mod no_tls { - use crate::async_impl::Connector; - use crate::TcpStream; - use core::{future::Future, pin::Pin}; +mod no_tls { + use super::{tcp_stream, Connector, HttpConnector}; + use crate::{TcpStream, Uri}; + use core::future::Future; + use core::pin::Pin; use std::io::Error; - use ylong_http::request::uri::Uri; - impl Connector for super::HttpConnector { + impl Connector for HttpConnector { type Stream = TcpStream; type Error = Error; type Future = Pin> + Sync + Send>>; fn connect(&self, uri: &Uri) -> Self::Future { - let addr = if let Some(proxy) = self.config.proxies.match_proxy(uri) { - proxy.via_proxy(uri).authority().unwrap().to_string() - } else { - uri.authority().unwrap().to_string() - }; - Box::pin(async move { - TcpStream::connect(addr) - .await - .and_then(|stream| match stream.set_nodelay(true) { - Ok(()) => Ok(stream), - Err(e) => Err(e), - }) - }) + // Checks if this uri need be proxied. + let addr = self + .config + .proxies + .match_proxy(uri) + .map(|proxy| proxy.via_proxy(uri).authority().unwrap().to_string()) + .unwrap_or(uri.authority().unwrap().to_string()); + + Box::pin(async move { tcp_stream(&addr).await }) } } } #[cfg(feature = "__tls")] -pub(crate) mod tls_conn { - use crate::{ - async_impl::{AsyncSslStream, Connector, MixStream}, - ErrorKind, HttpClientError, - }; - use crate::{AsyncReadExt, AsyncWriteExt, TcpStream}; - use core::{future::Future, pin::Pin}; - use std::io::Write; - use ylong_http::request::uri::{Scheme, Uri}; - - impl Connector for super::HttpConnector { +mod tls { + use super::{tcp_stream, Connector, HttpConnector}; + use crate::async_impl::ssl_stream::{AsyncSslStream, MixStream}; + use crate::error::CauseMessage; + use crate::{AsyncReadExt, AsyncWriteExt, Scheme, TcpStream, Uri}; + use core::future::Future; + use core::pin::Pin; + use std::io::{Error, ErrorKind, Write}; + + impl Connector for HttpConnector { type Stream = MixStream; - type Error = HttpClientError; + type Error = Error; type Future = Pin> + Sync + Send>>; @@ -121,58 +133,34 @@ pub(crate) mod tls_conn { is_proxy = true; } - let host_name = match uri.host() { - Some(host) => host.to_string(), - None => "no host in uri".to_string(), - }; + let host_name = uri + .host() + .map(|host| host.to_string()) + .unwrap_or_else(|| "no host in uri".to_string()); match *uri.scheme().unwrap() { - Scheme::HTTP => Box::pin(async move { - let stream = TcpStream::connect(addr) - .await - .and_then(|stream| match stream.set_nodelay(true) { - Ok(()) => Ok(stream), - Err(e) => Err(e), - }) - .map_err(|e| { - HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)) - })?; - Ok(MixStream::Http(stream)) - }), + Scheme::HTTP => { + Box::pin(async move { Ok(MixStream::Http(tcp_stream(&addr).await?)) }) + } Scheme::HTTPS => { let config = self.config.tls.clone(); Box::pin(async move { - let tcp_stream = TcpStream::connect(addr) - .await - .and_then(|stream| match stream.set_nodelay(true) { - Ok(()) => Ok(stream), - Err(e) => Err(e), - }) - .map_err(|e| { - HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)) - })?; - - let tcp_stream = if is_proxy { - tunnel(tcp_stream, host, port, auth).await? - } else { - tcp_stream + let mut tcp = tcp_stream(&addr).await?; + + if is_proxy { + tcp = tunnel(tcp, host, port, auth).await?; }; - let mut tls_ssl = config.ssl().map_err(|e| { - HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)) - })?; - - tls_ssl.set_sni_verify(&host_name).map_err(|e| { - HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)) - })?; - - let mut stream = AsyncSslStream::new(tls_ssl.into_inner(), tcp_stream) - .map_err(|e| { - HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)) - })?; - Pin::new(&mut stream).connect().await.map_err(|e| { - HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)) - })?; + let mut stream = config + .ssl() + .and_then(|mut ssl| ssl.set_sni_verify(&host_name).map(|_| ssl)) + .and_then(|ssl| AsyncSslStream::new(ssl.into_inner(), tcp)) + .map_err(|e| Error::new(ErrorKind::Other, e))?; + + Pin::new(&mut stream) + .connect() + .await + .map_err(|e| Error::new(ErrorKind::Other, e))?; Ok(MixStream::Https(stream)) }) } @@ -185,40 +173,30 @@ pub(crate) mod tls_conn { host: String, port: u16, auth: Option, - ) -> Result { + ) -> Result { let mut req = Vec::new(); - // `unwrap()` never failed here. write!( &mut req, "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\n" - ) - .unwrap(); + )?; if let Some(value) = auth { - write!(&mut req, "Proxy-Authorization: Basic {value}\r\n").unwrap(); + write!(&mut req, "Proxy-Authorization: Basic {value}\r\n")?; } - write!(&mut req, "\r\n").unwrap(); + write!(&mut req, "\r\n")?; - conn.write_all(&req) - .await - .map_err(|e| HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)))?; + conn.write_all(&req).await?; let mut buf = [0; 8192]; let mut pos = 0; loop { - let n = conn - .read(&mut buf[pos..]) - .await - .map_err(|e| HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)))?; + let n = conn.read(&mut buf[pos..]).await?; if n == 0 { - return Err(HttpClientError::new_with_message( - ErrorKind::Connect, - "Error receiving from proxy", - )); + return Err(other_io_error("error receiving from proxy")); } pos += n; @@ -228,22 +206,17 @@ pub(crate) mod tls_conn { return Ok(conn); } if pos == buf.len() { - return Err(HttpClientError::new_with_message( - ErrorKind::Connect, - "proxy headers too long for tunnel", - )); + return Err(other_io_error("proxy headers too long for tunnel")); } } else if resp.starts_with(b"HTTP/1.1 407") { - return Err(HttpClientError::new_with_message( - ErrorKind::Connect, - "proxy authentication required", - )); + return Err(other_io_error("proxy authentication required")); } else { - return Err(HttpClientError::new_with_message( - ErrorKind::Connect, - "unsuccessful tunnel", - )); + return Err(other_io_error("unsuccessful tunnel")); } } } + + fn other_io_error(msg: &str) -> Error { + Error::new(ErrorKind::Other, CauseMessage::new(msg)) + } } diff --git a/ylong_http_client/src/async_impl/mod.rs b/ylong_http_client/src/async_impl/mod.rs index 3ef608959855a9f5143bc353c79a1ab098d9eb93..7c7252b3fcd2bf85ab57f816c05736eea97775f0 100644 --- a/ylong_http_client/src/async_impl/mod.rs +++ b/ylong_http_client/src/async_impl/mod.rs @@ -48,8 +48,6 @@ pub(crate) use pool::ConnPool; #[cfg(feature = "__tls")] mod ssl_stream; -#[cfg(feature = "__tls")] -pub use ssl_stream::{AsyncSslStream, MixStream}; // TODO: Remove these later. /// Client Adapter. diff --git a/ylong_http_client/src/async_impl/ssl_stream/c_ssl_stream.rs b/ylong_http_client/src/async_impl/ssl_stream/c_ssl_stream.rs index 18a9d038eeb277908439676f1f6a4281ec0102b5..1655716b269d093ae4baca5a3170fa5e7af8931c 100644 --- a/ylong_http_client/src/async_impl/ssl_stream/c_ssl_stream.rs +++ b/ylong_http_client/src/async_impl/ssl_stream/c_ssl_stream.rs @@ -100,9 +100,13 @@ where }; match check_io_to_poll(s.read(slice))? { Poll::Ready(len) => { + #[cfg(feature = "tokio_base")] unsafe { buf.assume_init(len); } + #[cfg(feature = "ylong_base")] + buf.assume_init(len); + buf.advance(len); Poll::Ready(Ok(())) } diff --git a/ylong_http_client/src/async_impl/ssl_stream/mix.rs b/ylong_http_client/src/async_impl/ssl_stream/mix.rs index bdc3f2e30a08b2e9868efdf70b0ca0fcbf4f6d03..487b8836fee9bba9cc25c638b2bea5aa0fbaedd6 100644 --- a/ylong_http_client/src/async_impl/ssl_stream/mix.rs +++ b/ylong_http_client/src/async_impl/ssl_stream/mix.rs @@ -13,7 +13,7 @@ * limitations under the License. */ -use crate::async_impl::AsyncSslStream; +use crate::async_impl::ssl_stream::AsyncSslStream; use crate::{AsyncRead, AsyncWrite, ReadBuf}; use core::{ pin::Pin, diff --git a/ylong_http_client/src/util/redirect.rs b/ylong_http_client/src/util/redirect.rs index 917e2ca5d8fdcba9b31a36f325e211e23912acad..2b2176a3c41228710f0f23f71ebdf000794f82c6 100644 --- a/ylong_http_client/src/util/redirect.rs +++ b/ylong_http_client/src/util/redirect.rs @@ -145,18 +145,22 @@ impl Redirect { let origin_scheme = request .uri() .scheme() - .ok_or_else(|| HttpClientError::new_with_message( - ErrorKind::Connect, - "No uri scheme in request", - ))? + .ok_or_else(|| { + HttpClientError::new_with_message( + ErrorKind::Connect, + "No uri scheme in request", + ) + })? .as_str(); let auth = request .uri() .authority() - .ok_or_else(|| HttpClientError::new_with_message( - ErrorKind::Connect, - "No uri authority in request", - ))? + .ok_or_else(|| { + HttpClientError::new_with_message( + ErrorKind::Connect, + "No uri authority in request", + ) + })? .to_str(); let origin_auth = auth.as_str(); loc_uri = Uri::builder() @@ -166,19 +170,23 @@ impl Redirect { .path( loc_uri .path() - .ok_or_else(|| HttpClientError::new_with_message( - ErrorKind::Connect, - "No loc_uri path in location", - ))? + .ok_or_else(|| { + HttpClientError::new_with_message( + ErrorKind::Connect, + "No loc_uri path in location", + ) + })? .as_str(), ) .query( loc_uri .query() - .ok_or_else(|| HttpClientError::new_with_message( - ErrorKind::Connect, - "No loc_uri query in location", - ))? + .ok_or_else(|| { + HttpClientError::new_with_message( + ErrorKind::Connect, + "No loc_uri query in location", + ) + })? .as_str(), ) .build()