diff --git a/ylong_http/Cargo.toml b/ylong_http/Cargo.toml index d231a03827861d41b85b1ec2df7c6da399f6da35..21ecf0b8b817d16aa9ec20058ca6af7ca79f45f5 100644 --- a/ylong_http/Cargo.toml +++ b/ylong_http/Cargo.toml @@ -25,8 +25,8 @@ tokio_base = ["tokio"] # Uses asynchronous components of `tokio` ylong_base = ["ylong_runtime"] # Uses asynchronous components of `ylong` [dependencies] -tokio = { version = "1.20.1", features = ["io-util"], optional = true } -ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime.git", optional = true } +tokio = { version = "1.20.1", features = ["io-util", "fs"], optional = true } +ylong_runtime = { git = "https://gitee.com/openharmony/commonlibrary_rust_ylong_runtime.git", features = ["fs", "sync"], optional = true } [dev-dependencies] tokio = { version = "1.20.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/ylong_http/src/body/mime/simple.rs b/ylong_http/src/body/mime/simple.rs index b5aa7b031ebabb14a3f6eb5d251e424e5d9f3b79..35c0864ea799ffae8bfa26c34f20e4913515baba 100644 --- a/ylong_http/src/body/mime/simple.rs +++ b/ylong_http/src/body/mime/simple.rs @@ -13,12 +13,13 @@ // TODO: reuse mime later. +use std::future::Future; use std::io::Cursor; use std::pin::Pin; use std::task::{Context, Poll}; use std::vec::IntoIter; -use crate::body::async_impl::Body; +use crate::body::async_impl::{Body, ReusableReader}; use crate::{AsyncRead, ReadBuf}; /// A structure that helps you build a `multipart/form-data` message. @@ -174,10 +175,23 @@ impl MultiPart { states.push(MultiPartState::bytes( format!("--{}--\r\n", self.boundary).into_bytes(), )); - self.status = ReadStatus::Reading(MultiPartStates { - states: states.into_iter(), - curr: None, - }) + self.status = ReadStatus::Reading(MultiPartStates { states, index: 0 }) + } + + pub(crate) async fn reuse_inner(&mut self) -> std::io::Result<()> { + match std::mem::replace(&mut self.status, ReadStatus::Never) { + ReadStatus::Never => Ok(()), + ReadStatus::Reading(mut states) => { + let res = states.reuse().await; + self.status = ReadStatus::Reading(states); + res + } + ReadStatus::Finish(mut states) => { + states.reuse().await?; + self.status = ReadStatus::Reading(states); + Ok(()) + } + } } } @@ -196,7 +210,7 @@ impl AsyncRead for MultiPart { match self.status { ReadStatus::Never => self.build_status(), ReadStatus::Reading(_) => {} - ReadStatus::Finish => return Poll::Ready(Ok(())), + ReadStatus::Finish(_) => return Poll::Ready(Ok(())), } let status = if let ReadStatus::Reading(ref mut status) = self.status { @@ -213,7 +227,10 @@ impl AsyncRead for MultiPart { Poll::Ready(Ok(())) => { let new_filled = buf.filled().len(); if filled == new_filled { - self.status = ReadStatus::Finish; + match std::mem::replace(&mut self.status, ReadStatus::Never) { + ReadStatus::Reading(states) => self.status = ReadStatus::Finish(states), + _ => unreachable!(), + }; } Poll::Ready(Ok(())) } @@ -230,6 +247,23 @@ impl AsyncRead for MultiPart { } } +impl ReusableReader for MultiPart { + fn reuse<'a>( + &'a mut self, + ) -> Pin> + Send + Sync + 'a>> + where + Self: 'a, + { + Box::pin(async { + match self.status { + ReadStatus::Never => Ok(()), + ReadStatus::Reading(_) => self.reuse_inner().await, + ReadStatus::Finish(_) => self.reuse_inner().await, + } + }) + } +} + /// A structure that represents a part of `multipart/form-data` message. /// /// # Examples @@ -352,8 +386,8 @@ impl Part { /// Sets a stream body of this `Part`. /// /// The body message will be set to the body part. - pub fn stream(mut self, body: T) -> Self { - self.body = Some(MultiPartState::stream(Box::pin(body))); + pub fn stream(mut self, body: T) -> Self { + self.body = Some(MultiPartState::stream(Box::new(body))); self } } @@ -365,7 +399,7 @@ impl Default for Part { } /// A basic trait for MultiPart. -pub trait MultiPartBase: AsyncRead { +pub trait MultiPartBase: ReusableReader { /// Get reference of MultiPart. fn multipart(&self) -> &MultiPart; } @@ -379,12 +413,27 @@ impl MultiPartBase for MultiPart { enum ReadStatus { Never, Reading(MultiPartStates), - Finish, + Finish(MultiPartStates), } struct MultiPartStates { - states: IntoIter, - curr: Option, + states: Vec, + index: usize, +} + +impl MultiPartStates { + async fn reuse(&mut self) -> std::io::Result<()> { + self.index = 0; + for state in self.states.iter_mut() { + match state { + MultiPartState::Bytes(bytes) => bytes.set_position(0), + MultiPartState::Stream(stream) => { + stream.reuse().await?; + } + } + } + Ok(()) + } } impl MultiPartStates { @@ -393,11 +442,11 @@ impl MultiPartStates { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let mut state = if let Some(state) = self.curr.take() { - state - } else { - return Poll::Ready(Ok(())); + let state = match self.states.get_mut(self.index) { + Some(state) => state, + None => return Poll::Ready(Ok(())), }; + match state { MultiPartState::Bytes(ref mut bytes) => { let filled_len = buf.filled().len(); @@ -406,39 +455,26 @@ impl MultiPartStates { let new = std::io::Read::read(bytes, unfilled).unwrap(); buf.set_filled(filled_len + new); - if new >= unfilled_len { - self.curr = Some(state); + if new < unfilled_len { + self.index += 1; } Poll::Ready(Ok(())) } - MultiPartState::Stream(ref mut stream) => { + MultiPartState::Stream(stream) => { let old_len = buf.filled().len(); - let result = stream.as_mut().poll_read(cx, buf); + let result = unsafe { Pin::new_unchecked(stream).poll_read(cx, buf) }; let new_len = buf.filled().len(); - self.poll_result(result, old_len, new_len, state) - } - } - } - - fn poll_result( - &mut self, - result: Poll>, - old_len: usize, - new_len: usize, - state: MultiPartState, - ) -> Poll> { - match result { - Poll::Ready(Ok(())) => { - if old_len != new_len { - self.curr = Some(state); + match result { + Poll::Ready(Ok(())) => { + if old_len == new_len { + self.index += 1; + } + Poll::Ready(Ok(())) + } + Poll::Pending => Poll::Pending, + x => x, } - Poll::Ready(Ok(())) } - Poll::Pending => { - self.curr = Some(state); - Poll::Pending - } - x => x, } } } @@ -451,13 +487,9 @@ impl AsyncRead for MultiPartStates { ) -> Poll> { let this = self.get_mut(); while !buf.initialize_unfilled().is_empty() { - if this.curr.is_none() { - this.curr = match this.states.next() { - None => break, - x => x, - } + if this.states.get(this.index).is_none() { + break; } - match this.poll_read_curr(cx, buf) { Poll::Ready(Ok(())) => {} x => return x, @@ -469,7 +501,7 @@ impl AsyncRead for MultiPartStates { enum MultiPartState { Bytes(Cursor>), - Stream(Pin>), + Stream(Box), } impl MultiPartState { @@ -477,7 +509,7 @@ impl MultiPartState { Self::Bytes(Cursor::new(bytes)) } - fn stream(reader: Pin>) -> Self { + fn stream(reader: Box) -> Self { Self::Stream(reader) } } diff --git a/ylong_http/src/body/mod.rs b/ylong_http/src/body/mod.rs index b71d86241f4d14432467ddecf861c48738b300ca..dc87cc4d00f8f3f2c98423d699c447000792d180 100644 --- a/ylong_http/src/body/mod.rs +++ b/ylong_http/src/body/mod.rs @@ -46,6 +46,7 @@ mod empty; mod mime; mod text; +pub use async_impl::ReusableReader; pub use chunk::{Chunk, ChunkBody, ChunkBodyDecoder, ChunkExt, ChunkState, Chunks}; pub use empty::EmptyBody; pub use mime::{ @@ -399,6 +400,40 @@ pub mod async_impl { Pin::new(&mut *fut.body).poll_data(cx, fut.buf) } } + + /// The reuse trait of request body. + pub trait ReusableReader: AsyncRead + Sync { + /// Reset body state, Ensure that the body can be re-read. + fn reuse<'a>( + &'a mut self, + ) -> Pin> + Send + Sync + 'a>> + where + Self: 'a; + } + + impl ReusableReader for crate::File { + fn reuse<'a>( + &'a mut self, + ) -> Pin> + Send + Sync + 'a>> + where + Self: 'a, + { + use crate::AsyncSeekExt; + + Box::pin(async { self.rewind().await.map(|_| ()) }) + } + } + + impl ReusableReader for &[u8] { + fn reuse<'a>( + &'a mut self, + ) -> Pin> + Send + Sync + 'a>> + where + Self: 'a, + { + Box::pin(async { Ok(()) }) + } + } } // Type definitions of the origin of the body data. diff --git a/ylong_http/src/lib.rs b/ylong_http/src/lib.rs index b99a3b36e594bf0ccc678f9d4f35b016e8933298..6af6bfad7cadcde8168e776983f928551009ebfa 100644 --- a/ylong_http/src/lib.rs +++ b/ylong_http/src/lib.rs @@ -45,6 +45,12 @@ pub mod version; pub(crate) mod util; #[cfg(feature = "tokio_base")] -pub(crate) use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; +pub(crate) use tokio::{ + fs::File, + io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf}, +}; #[cfg(feature = "ylong_base")] -pub(crate) use ylong_runtime::io::{AsyncRead, AsyncReadExt, ReadBuf}; +pub(crate) use ylong_runtime::{ + fs::File, + io::{AsyncRead, AsyncReadExt, AsyncSeekExt, ReadBuf}, +}; diff --git a/ylong_http_client/src/async_impl/client.rs b/ylong_http_client/src/async_impl/client.rs index b894a3214ba9aea938c697f63d16ef87e45c8eca..0a11156bb3507c3ccb3ded7501c6648237970f63 100644 --- a/ylong_http_client/src/async_impl/client.rs +++ b/ylong_http_client/src/async_impl/client.rs @@ -17,7 +17,7 @@ use ylong_http::request::uri::Uri; use super::pool::ConnPool; use super::timeout::TimeoutFuture; -use super::{conn, Body, Connector, HttpConnector, Request, Response}; +use super::{conn, Connector, HttpConnector, Request, Response}; use crate::async_impl::interceptor::{IdleInterceptor, Interceptor, Interceptors}; use crate::async_impl::request::Message; use crate::error::HttpClientError; @@ -36,7 +36,7 @@ use crate::util::redirect::{RedirectInfo, Trigger}; use crate::util::request::RequestArc; #[cfg(feature = "__c_openssl")] use crate::CertVerifier; -use crate::Retry; +use crate::{ErrorKind, Retry}; /// HTTP asynchronous client implementation. Users can use `async_impl::Client` /// to send `Request` asynchronously. @@ -147,7 +147,7 @@ impl Client { loop { let response = self.send_request(request.clone()).await; if let Err(ref err) = response { - if retries > 0 && request.ref_mut().body_mut().reuse() { + if retries > 0 && request.ref_mut().body_mut().reuse().await.is_ok() { self.interceptors.intercept_retry(err)?; retries -= 1; continue; @@ -217,9 +217,12 @@ impl Client { { Trigger::NextLink => { // Here the body should be reused. - if !request.ref_mut().body_mut().reuse() { - *request.ref_mut().body_mut() = Body::empty(); - } + request + .ref_mut() + .body_mut() + .reuse() + .await + .map_err(|e| HttpClientError::from_io_error(ErrorKind::Redirect, e))?; self.interceptors .intercept_redirect_request(request.ref_mut())?; response = self.send_unformatted_request(request.clone()).await?; diff --git a/ylong_http_client/src/async_impl/request.rs b/ylong_http_client/src/async_impl/request.rs index 1a61eaa8af9c481f90d5809328a6508d30fb1761..9c41c6e294ceca5b63a7d620e81e3827510d382a 100644 --- a/ylong_http_client/src/async_impl/request.rs +++ b/ylong_http_client/src/async_impl/request.rs @@ -19,6 +19,7 @@ use core::task::{Context, Poll}; use std::io::Cursor; use std::sync::Arc; +use ylong_http::body::async_impl::ReusableReader; use ylong_http::body::MultiPartBase; use ylong_http::request::uri::PercentEncoder as PerEncoder; use ylong_http::request::{Request as Req, RequestBuilder as ReqBuilder}; @@ -255,7 +256,7 @@ pub struct Body { pub(crate) enum BodyKind { Empty, Slice(Cursor>), - Stream(Box), + Stream(Box), Multipart(Box), } @@ -304,10 +305,10 @@ impl Body { /// ``` pub fn stream(stream: T) -> Self where - T: AsyncRead + Send + Sync + Unpin + 'static, + T: ReusableReader + Send + Sync + Unpin + 'static, { Body::new(BodyKind::Stream( - Box::new(stream) as Box + Box::new(stream) as Box )) } @@ -340,15 +341,15 @@ impl Body { Self { inner } } - // TODO: Considers reusing unread stream ? - pub(crate) fn reuse(&mut self) -> bool { + pub(crate) async fn reuse(&mut self) -> std::io::Result<()> { match self.inner { - BodyKind::Empty => true, + BodyKind::Empty => Ok(()), BodyKind::Slice(ref mut slice) => { slice.set_position(0); - true + Ok(()) } - _ => false, + BodyKind::Stream(ref mut stream) => stream.reuse().await, + BodyKind::Multipart(ref mut multipart) => multipart.reuse().await, } } } @@ -470,7 +471,6 @@ mod ut_client_request { .length(Some(4)), ); let mut request = RequestBuilder::default().body(Body::multipart(mp)).unwrap(); - assert!(!request.body_mut().reuse()); let handle = ylong_runtime::spawn(async move { let mut buf = vec![0u8; 50]; let mut v_size = vec![]; diff --git a/ylong_http_client/src/async_impl/uploader/mod.rs b/ylong_http_client/src/async_impl/uploader/mod.rs index 1900b88d4ea4575f7343a35eb6047df58327d612..068e0424e2d66db41976d5bcaa61eb04cc6022ff 100644 --- a/ylong_http_client/src/async_impl/uploader/mod.rs +++ b/ylong_http_client/src/async_impl/uploader/mod.rs @@ -14,11 +14,13 @@ mod builder; mod operator; +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; pub use builder::{UploaderBuilder, WantsReader}; pub use operator::{Console, UploadOperator}; +use ylong_http::body::async_impl::ReusableReader; use ylong_http::body::{MultiPart, MultiPartBase}; use crate::runtime::{AsyncRead, ReadBuf}; @@ -90,7 +92,7 @@ pub struct Uploader { info: Option, } -impl Uploader { +impl Uploader { /// Creates an `Uploader` with a `Console` operator which displays process /// on console. /// @@ -123,7 +125,7 @@ impl Uploader<(), ()> { impl AsyncRead for Uploader where - R: AsyncRead + Unpin, + R: ReusableReader + Unpin, T: UploadOperator + Unpin, { fn poll_read( @@ -159,7 +161,23 @@ where } } -impl MultiPartBase for Uploader { +impl ReusableReader for Uploader +where + R: ReusableReader + Unpin, + T: UploadOperator + Unpin + Sync, +{ + fn reuse<'a>( + &'a mut self, + ) -> Pin> + Send + Sync + 'a>> + where + Self: 'a, + { + self.info = None; + self.reader.reuse() + } +} + +impl MultiPartBase for Uploader { fn multipart(&self) -> &MultiPart { &self.reader } diff --git a/ylong_http_client/src/lib.rs b/ylong_http_client/src/lib.rs index 01378610e693e3dbea57487d2b03eaa30e8c9cd3..443affdae2cfe8973d78e03ecafaad6158e759da 100644 --- a/ylong_http_client/src/lib.rs +++ b/ylong_http_client/src/lib.rs @@ -22,7 +22,7 @@ // ylong_http crate re-export. #[cfg(any(feature = "ylong_base", feature = "tokio_base"))] -pub use ylong_http::body::{EmptyBody, TextBody}; +pub use ylong_http::body::{EmptyBody, ReusableReader, TextBody}; pub use ylong_http::headers::{ Header, HeaderName, HeaderValue, HeaderValueIter, HeaderValueIterMut, Headers, HeadersIntoIter, HeadersIter, HeadersIterMut,