diff --git a/ylong_http_client/src/async_impl/conn/http1.rs b/ylong_http_client/src/async_impl/conn/http1.rs index 40e115c97f2f25cac437475411e9dc544b45c483..3a6bbff1001620465ac61d6b89e206aa04aedadb 100644 --- a/ylong_http_client/src/async_impl/conn/http1.rs +++ b/ylong_http_client/src/async_impl/conn/http1.rs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; use std::mem::take; use std::pin::Pin; use std::sync::Arc; @@ -29,7 +28,7 @@ use super::StreamData; use crate::async_impl::request::Message; use crate::async_impl::{HttpBody, Request, Response}; use crate::error::HttpClientError; -use crate::runtime::{poll_fn, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; +use crate::runtime::{poll_fn, AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use crate::util::config::HttpVersion; use crate::util::dispatcher::http1::Http1Conn; use crate::util::information::ConnInfo; @@ -76,14 +75,8 @@ where "Below low speed limit", ))); } - let result = { - let mut read_fut = Box::pin(read_status_line( - &mut conn, - message.request.ref_mut(), - buf.as_mut_slice(), - )); - read_fut.as_mut().poll(cx)? - }; + let result = + read_status_line(cx, &mut conn, message.request.ref_mut(), buf.as_mut_slice())?; if let Poll::Ready(filled) = result { conn.speed_controller.reset_recv_pending_timeout(); return Poll::Ready(Ok(filled)); @@ -91,6 +84,7 @@ where Poll::Pending }) .await?; + message.interceptor.intercept_output(&buf[..size])?; match decoder.decode(&buf[..size]) { Ok(None) => {} @@ -107,29 +101,38 @@ where decode_response(message, part, conn, pre) } -async fn read_status_line( +fn read_status_line( + cx: &mut Context<'_>, conn: &mut Http1Conn, request: &mut Request, buf: &mut [u8], -) -> Result +) -> Poll> where S: AsyncRead + Sync + Send + Unpin + 'static, { - match conn.raw_mut().read(buf).await { - Ok(0) => { - conn.shutdown(); - err_from_msg!(Request, "Tcp closed") - } - Ok(size) => { + let mut read_buf = ReadBuf::new(buf); + match Pin::new(conn.raw_mut()).poll_read(cx, &mut read_buf) { + Poll::Ready(Ok(_)) => { + #[cfg(feature = "ylong_base")] + let size = read_buf.filled_len(); + + #[cfg(feature = "tokio_base")] + let size = read_buf.filled().len(); + + if size == 0 { + conn.shutdown(); + return Poll::Ready(err_from_msg!(Request, "Tcp closed")); + } if request.time_group_mut().transfer_end_time().is_none() { request.time_group_mut().set_transfer_end(Instant::now()) } - Ok(size) + Poll::Ready(Ok(size)) } - Err(e) => { + Poll::Ready(Err(e)) => { conn.shutdown(); - err_from_io!(Request, e) + Poll::Ready(err_from_io!(Request, e)) } + Poll::Pending => Poll::Pending, } } @@ -287,24 +290,33 @@ where if written == buf.len() || end_body { conn.speed_controller.init_min_send_if_not_start(); conn.speed_controller.init_max_send_if_not_start(); - let write_res = poll_fn(|cx| { - if conn.speed_controller.poll_send_pending_timeout(cx) { - return Poll::Ready(Err(HttpClientError::from_str( - BodyTransfer, - "Below low speed limit", - ))); + let mut write_size = 0; + loop { + let write_res = poll_fn(|cx| { + if conn.speed_controller.poll_send_pending_timeout(cx) { + return Poll::Ready(Err(HttpClientError::from_str( + BodyTransfer, + "Below low speed limit", + ))); + } + let write_poll = + Pin::new(conn.raw_mut()).poll_write(cx, &buf[write_size..written]); + if let Poll::Ready(Ok(_)) = write_poll { + conn.speed_controller.reset_send_pending_timeout(); + } + write_poll.map_err(|e| HttpClientError::from_error(BodyTransfer, e)) + }) + .await; + match write_res { + Ok(size) => write_size += size, + Err(e) => { + conn.shutdown(); + return Err(e); + } } - let mut write_fut = Box::pin(conn.raw_mut().write_all(&buf[..written])); - let write_poll = write_fut.as_mut().poll(cx); - if let Poll::Ready(Ok(_)) = write_poll { - conn.speed_controller.reset_send_pending_timeout(); + if write_size == written { + break; } - write_poll.map_err(|e| HttpClientError::from_error(BodyTransfer, e)) - }) - .await; - if let Err(e) = write_res { - conn.shutdown(); - return Err(e); } if conn.speed_controller.need_limit_max_send_speed() { conn.speed_controller.max_send_speed_limit(written).await;