From f07cbae7c5272cd6eafae09b8172e448c4156094 Mon Sep 17 00:00:00 2001 From: Tiga Ultraman Date: Fri, 26 Jul 2024 17:44:33 +0800 Subject: [PATCH 1/2] fix http2 local cache and cpu oversize Signed-off-by: Tiga Ultraman --- ylong_http/src/h2/error.rs | 4 +- ylong_http_client/src/async_impl/client.rs | 18 +- .../src/async_impl/conn/http2.rs | 20 +- ylong_http_client/src/lib.rs | 12 +- ylong_http_client/src/util/config/http.rs | 10 + ylong_http_client/src/util/dispatcher.rs | 137 +++++- ylong_http_client/src/util/h2/manager.rs | 405 ++++++++++++------ ylong_http_client/src/util/h2/mod.rs | 2 +- ylong_http_client/src/util/h2/output.rs | 192 +++++++-- ylong_http_client/src/util/h2/streams.rs | 48 ++- 10 files changed, 623 insertions(+), 225 deletions(-) diff --git a/ylong_http/src/h2/error.rs b/ylong_http/src/h2/error.rs index 99a6033..8ec3279 100644 --- a/ylong_http/src/h2/error.rs +++ b/ylong_http/src/h2/error.rs @@ -29,7 +29,7 @@ use std::convert::{Infallible, TryFrom}; use crate::error::{ErrorKind, HttpError}; /// The http2 error handle implementation -#[derive(Debug, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum H2Error { /// [`Stream Error`] Handling. /// @@ -45,7 +45,7 @@ pub enum H2Error { /// [`Error Codes`] implementation. /// /// [`Error Codes`]: https://httpwg.org/specs/rfc9113.html#ErrorCodes -#[derive(Debug, Eq, PartialEq, Clone)] +#[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum ErrorCode { /// The associated condition is not a result of an error. For example, /// a `GOAWAY` might include this code to indicate graceful shutdown of a diff --git a/ylong_http_client/src/async_impl/client.rs b/ylong_http_client/src/async_impl/client.rs index b894a32..658e3b3 100644 --- a/ylong_http_client/src/async_impl/client.rs +++ b/ylong_http_client/src/async_impl/client.rs @@ -498,6 +498,20 @@ impl ClientBuilder { self } + /// Sets allowed max size of local cached frame. + /// + /// # Examples + /// + /// ``` + /// use ylong_http_client::async_impl::ClientBuilder; + /// + /// let config = ClientBuilder::new().allow_cached_frame_num(10); + /// ``` + pub fn allow_cached_frame_num(mut self, num: usize) -> Self { + self.http.http2_config.set_allow_cached_frame_num(num); + self + } + /// Sets the `SETTINGS_MAX_FRAME_SIZE`. /// /// # Examples @@ -575,7 +589,7 @@ impl ClientBuilder { impl ClientBuilder { /// Sets the maximum allowed TLS version for connections. /// - /// By default there's no maximum. + /// By default, there's no maximum. /// /// # Examples /// @@ -592,7 +606,7 @@ impl ClientBuilder { /// Sets the minimum required TLS version for connections. /// - /// By default the TLS backend's own default is used. + /// By default, the TLS backend's own default is used. /// /// # Examples /// diff --git a/ylong_http_client/src/async_impl/conn/http2.rs b/ylong_http_client/src/async_impl/conn/http2.rs index 23cb3ef..4dd2457 100644 --- a/ylong_http_client/src/async_impl/conn/http2.rs +++ b/ylong_http_client/src/async_impl/conn/http2.rs @@ -453,10 +453,10 @@ mod ut_http2 { use crate::async_impl::conn::http2::TextIo; use crate::util::dispatcher::http2::Http2Conn; - let (resp_tx, resp_rx) = crate::runtime::unbounded_channel(); + let (resp_tx, resp_rx) = ylong_runtime::sync::mpsc::bounded_channel(20); let (req_tx, _req_rx) = crate::runtime::unbounded_channel(); let shutdown = Arc::new(AtomicBool::new(false)); - let mut conn: Http2Conn<()> = Http2Conn::new(1, shutdown, req_tx); + let mut conn: Http2Conn<()> = Http2Conn::new(1, 20, shutdown, req_tx); conn.receiver.set_receiver(resp_rx); let mut text_io = TextIo::new(conn); let data_1 = Frame::new( @@ -474,9 +474,19 @@ mod ut_http2 { FrameFlags::new(1), Payload::Data(Data::new(vec![b'a'; 10])), ); - let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_1)); - let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_2)); - let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_3)); + + ylong_runtime::block_on(async { + let _ = resp_tx + .send(crate::util::dispatcher::http2::RespMessage::Output(data_1)) + .await; + let _ = resp_tx + .send(crate::util::dispatcher::http2::RespMessage::Output(data_2)) + .await; + let _ = resp_tx + .send(crate::util::dispatcher::http2::RespMessage::Output(data_3)) + .await; + }); + ylong_runtime::block_on(async { let mut buf = [0_u8; 10]; let mut output_vec = vec![]; diff --git a/ylong_http_client/src/lib.rs b/ylong_http_client/src/lib.rs index 0137861..2f687a9 100644 --- a/ylong_http_client/src/lib.rs +++ b/ylong_http_client/src/lib.rs @@ -72,7 +72,11 @@ pub(crate) mod runtime { io::{split, ReadHalf, WriteHalf}, spawn, sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + mpsc::{ + channel as bounded_channel, error::SendError, unbounded_channel, + Receiver as BoundedReceiver, Sender as BoundedSender, UnboundedReceiver, + UnboundedSender, + }, Mutex as AsyncMutex, MutexGuard, }, task::JoinHandle, @@ -94,7 +98,11 @@ pub(crate) mod runtime { pub(crate) use ylong_runtime::{ spawn, sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + error::SendError, + mpsc::{ + bounded_channel, unbounded_channel, BoundedReceiver, BoundedSender, + UnboundedReceiver, UnboundedSender, + }, Mutex as AsyncMutex, MutexGuard, }, task::JoinHandle, diff --git a/ylong_http_client/src/util/config/http.rs b/ylong_http_client/src/util/config/http.rs index ea6f3b9..2a237b6 100644 --- a/ylong_http_client/src/util/config/http.rs +++ b/ylong_http_client/src/util/config/http.rs @@ -75,6 +75,7 @@ pub(crate) mod http2 { init_conn_window_size: u32, init_stream_window_size: u32, enable_push: bool, + allow_cached_frame_num: usize, } impl H2Config { @@ -106,6 +107,10 @@ pub(crate) mod http2 { self.init_stream_window_size = size; } + pub(crate) fn set_allow_cached_frame_num(&mut self, num: usize) { + self.allow_cached_frame_num = num; + } + /// Gets the SETTINGS_MAX_FRAME_SIZE. pub(crate) fn max_frame_size(&self) -> u32 { self.max_frame_size @@ -132,6 +137,10 @@ pub(crate) mod http2 { pub(crate) fn stream_window_size(&self) -> u32 { self.init_stream_window_size } + + pub(crate) fn allow_cached_frame_num(&self) -> usize { + self.allow_cached_frame_num + } } impl Default for H2Config { @@ -143,6 +152,7 @@ pub(crate) mod http2 { init_conn_window_size: DEFAULT_CONN_WINDOW_SIZE, init_stream_window_size: DEFAULT_STREAM_WINDOW_SIZE, enable_push: false, + allow_cached_frame_num: 5, } } } diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index c7c23a5..ab5028d 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -148,6 +148,7 @@ pub(crate) mod http1 { #[cfg(feature = "http2")] pub(crate) mod http2 { use std::collections::HashMap; + use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; @@ -157,16 +158,19 @@ pub(crate) mod http2 { use ylong_http::error::HttpError; use ylong_http::h2::{ ErrorCode, Frame, FrameDecoder, FrameEncoder, FrameFlags, Goaway, H2Error, Payload, - Settings, SettingsBuilder, + RstStream, Settings, SettingsBuilder, }; use crate::runtime::{ - unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, UnboundedReceiver, - UnboundedSender, WriteHalf, + bounded_channel, unbounded_channel, AsyncRead, AsyncWrite, AsyncWriteExt, BoundedReceiver, + BoundedSender, SendError, UnboundedReceiver, UnboundedSender, WriteHalf, }; use crate::util::config::H2Config; use crate::util::dispatcher::{ConnDispatcher, Dispatcher}; - use crate::util::h2::{ConnManager, FlowControl, RecvData, RequestWrapper, SendData, Streams}; + use crate::util::h2::{ + ConnManager, FlowControl, H2StreamState, RecvData, RequestWrapper, SendData, + StreamEndState, Streams, + }; use crate::ErrorKind::Request; use crate::{ErrorKind, HttpClientError}; @@ -175,6 +179,9 @@ pub(crate) mod http2 { const DEFAULT_MAX_HEADER_LIST_SIZE: usize = 16 << 20; const DEFAULT_WINDOW_SIZE: u32 = 65535; + pub(crate) type ManagerSendFut = + Pin>> + Send + Sync>>; + pub(crate) enum RespMessage { Output(Frame), OutputExit(DispatchErrorKind), @@ -187,11 +194,11 @@ pub(crate) mod http2 { pub(crate) struct ReqMessage { pub(crate) id: u32, - pub(crate) sender: UnboundedSender, + pub(crate) sender: BoundedSender, pub(crate) request: RequestWrapper, } - #[derive(Debug, Eq, PartialEq, Clone)] + #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub(crate) enum DispatchErrorKind { H2(H2Error), Io(std::io::ErrorKind), @@ -203,6 +210,7 @@ pub(crate) mod http2 { // threads according to HTTP2 syntax. pub(crate) struct Http2Dispatcher { pub(crate) next_stream_id: StreamId, + pub(crate) allow_cached_frames: usize, pub(crate) sender: UnboundedSender, pub(crate) io_shutdown: Arc, pub(crate) handles: Vec>, @@ -212,6 +220,7 @@ pub(crate) mod http2 { pub(crate) struct Http2Conn { // Handle id pub(crate) id: u32, + pub(crate) allow_cached_frames: usize, // Sends frame to StreamController pub(crate) sender: UnboundedSender, pub(crate) receiver: RespReceiver, @@ -224,11 +233,12 @@ pub(crate) mod http2 { // closed. pub(crate) io_shutdown: Arc, // The senders of all connected stream channels of response. - pub(crate) senders: HashMap>, + pub(crate) senders: HashMap>, + pub(crate) curr_message: HashMap, // Stream information on the connection. pub(crate) streams: Streams, // Received GO_AWAY frame. - pub(crate) go_away: Option, + pub(crate) recved_go_away: Option, // The last GO_AWAY frame sent by the client. pub(crate) go_away_sync: GoAwaySync, } @@ -257,7 +267,7 @@ pub(crate) mod http2 { #[derive(Default)] pub(crate) struct RespReceiver { - receiver: Option>, + receiver: Option>, } impl ConnDispatcher @@ -294,10 +304,19 @@ pub(crate) mod http2 { // being. let mut handles = Vec::with_capacity(3); if input_tx.send(settings).is_ok() { - Self::launch(controller, req_rx, input_tx, input_rx, &mut handles, io); + Self::launch( + config.allow_cached_frame_num(), + controller, + req_rx, + input_tx, + input_rx, + &mut handles, + io, + ); } Self { next_stream_id, + allow_cached_frames: config.allow_cached_frame_num(), sender: req_tx, io_shutdown: shutdown_flag, handles, @@ -306,6 +325,7 @@ pub(crate) mod http2 { } fn launch( + allow_num: usize, controller: StreamController, req_rx: UnboundedReceiver, input_tx: UnboundedSender, @@ -313,7 +333,7 @@ pub(crate) mod http2 { handles: &mut Vec>, io: S, ) { - let (resp_tx, resp_rx) = unbounded_channel(); + let (resp_tx, resp_rx) = bounded_channel(allow_num); let (read, write) = crate::runtime::split(io); let settings_sync = Arc::new(Mutex::new(SettingsSync::default())); let send_settings_sync = settings_sync.clone(); @@ -339,9 +359,7 @@ pub(crate) mod http2 { let manager = crate::runtime::spawn(async move { let mut conn_manager = ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller); - if let Err(e) = Pin::new(&mut conn_manager).await { - conn_manager.exit_with_error(e); - } + let _ = Pin::new(&mut conn_manager).await; }); handles.push(manager); } @@ -356,7 +374,12 @@ pub(crate) mod http2 { return None; } let sender = self.sender.clone(); - let handle = Http2Conn::new(id, self.io_shutdown.clone(), sender); + let handle = Http2Conn::new( + id, + self.allow_cached_frames, + self.io_shutdown.clone(), + sender, + ); Some(handle) } @@ -379,11 +402,13 @@ pub(crate) mod http2 { impl Http2Conn { pub(crate) fn new( id: u32, + allow_cached_num: usize, io_shutdown: Arc, sender: UnboundedSender, ) -> Self { Self { id, + allow_cached_frames: allow_cached_num, sender, receiver: RespReceiver::default(), io_shutdown, @@ -395,7 +420,7 @@ pub(crate) mod http2 { &mut self, request: RequestWrapper, ) -> Result<(), HttpClientError> { - let (tx, rx) = unbounded_channel::(); + let (tx, rx) = bounded_channel::(self.allow_cached_frames); self.receiver.set_receiver(rx); self.sender .send(ReqMessage { @@ -420,8 +445,9 @@ pub(crate) mod http2 { Self { io_shutdown: shutdown, senders: HashMap::new(), + curr_message: HashMap::new(), streams, - go_away: None, + recved_go_away: None, go_away_sync: GoAwaySync::default(), } } @@ -430,7 +456,7 @@ pub(crate) mod http2 { self.io_shutdown.store(true, Ordering::Release); } - pub(crate) fn go_away_unsent_stream( + pub(crate) fn get_unsent_streams( &mut self, last_stream_id: u32, ) -> Result, H2Error> { @@ -443,21 +469,86 @@ pub(crate) mod http2 { Ok(self.streams.get_go_away_streams(last_stream_id)) } - pub(crate) fn send_message_to_stream(&mut self, stream_id: u32, message: RespMessage) { + pub(crate) fn send_message_to_stream( + &mut self, + cx: &mut Context<'_>, + stream_id: u32, + message: RespMessage, + ) -> Poll> { if let Some(sender) = self.senders.get(&stream_id) { // If the client coroutine has exited, this frame is skipped. - match sender.send(message) { - Ok(_) => {} - Err(_e) => { + let mut tx = { + let sender = sender.clone(); + let ft = async move { sender.send(message).await }; + Box::pin(ft) + }; + + match tx.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + // The current coroutine sending the request exited prematurely. + Poll::Ready(Err(_)) => { self.senders.remove(&stream_id); + Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) + } + Poll::Pending => { + self.curr_message.insert(stream_id, tx); + Poll::Pending + } + } + } else { + Poll::Ready(Err(H2Error::StreamError(stream_id, ErrorCode::NoError))) + } + } + + pub(crate) fn poll_blocked_message( + &mut self, + cx: &mut Context<'_>, + input_tx: &UnboundedSender, + ) -> Poll<()> { + let keys: Vec = self.curr_message.keys().cloned().collect(); + let mut blocked = false; + + for key in keys { + if let Some(mut task) = self.curr_message.remove(&key) { + match task.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => {} + // The current coroutine sending the request exited prematurely. + Poll::Ready(Err(_)) => { + self.senders.remove(&key); + if let Some(state) = self.streams.stream_state(key) { + if !matches!(state, H2StreamState::Closed(_)) { + if let StreamEndState::OK = self.streams.send_local_reset(key) { + let rest_payload = + RstStream::new(ErrorCode::NoError.into_code()); + let frame = Frame::new( + key as usize, + FrameFlags::empty(), + Payload::RstStream(rest_payload), + ); + // ignore the send error occurs here in order to finish all + // tasks. + let _ = input_tx.send(frame); + } + } + } + } + Poll::Pending => { + self.curr_message.insert(key, task); + blocked = true; + } } } } + if blocked { + Poll::Pending + } else { + Poll::Ready(()) + } } } impl RespReceiver { - pub(crate) fn set_receiver(&mut self, receiver: UnboundedReceiver) { + pub(crate) fn set_receiver(&mut self, receiver: BoundedReceiver) { self.receiver = Some(receiver); } diff --git a/ylong_http_client/src/util/h2/manager.rs b/ylong_http_client/src/util/h2/manager.rs index 5e4bdb5..4b06c63 100644 --- a/ylong_http_client/src/util/h2/manager.rs +++ b/ylong_http_client/src/util/h2/manager.rs @@ -22,20 +22,29 @@ use ylong_http::h2::{ ErrorCode, Frame, FrameFlags, Goaway, H2Error, Payload, Ping, RstStream, Setting, }; -use crate::runtime::{UnboundedReceiver, UnboundedSender}; +use crate::runtime::{BoundedReceiver, UnboundedReceiver, UnboundedSender}; use crate::util::dispatcher::http2::{ DispatchErrorKind, OutputMessage, ReqMessage, RespMessage, SettingsState, SettingsSync, StreamController, }; use crate::util::h2::streams::{DataReadState, FrameRecvState, StreamEndState}; +#[derive(Copy, Clone)] +enum ManagerState { + Send, + Receive, + Exit(DispatchErrorKind), +} + pub(crate) struct ConnManager { + state: ManagerState, + next_state: ManagerState, // Synchronize SETTINGS frames sent by the client. settings: Arc>, // channel transmitter between manager and io input. input_tx: UnboundedSender, // channel receiver between manager and io output. - resp_rx: UnboundedReceiver, + resp_rx: BoundedReceiver, // channel receiver between manager and stream coroutine. req_rx: UnboundedReceiver, controller: StreamController, @@ -47,42 +56,60 @@ impl Future for ConnManager { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let manager = self.get_mut(); loop { - // Receives a response frame from io output. - match manager.resp_rx.poll_recv(cx) { - #[cfg(feature = "tokio_base")] - Poll::Ready(Some(message)) => match message { - OutputMessage::Output(frame) => { - manager.poll_recv_message(frame)?; - } - // io output occurs error. - OutputMessage::OutputExit(e) => { - manager.manage_resp_error(e)?; - } - }, - #[cfg(feature = "ylong_base")] - Poll::Ready(Ok(message)) => match message { - OutputMessage::Output(frame) => { - manager.poll_recv_message(frame)?; + match manager.state { + ManagerState::Send => { + if manager.poll_blocked_frames(cx).is_pending() { + return Poll::Pending; } - // io output occurs error. - OutputMessage::OutputExit(e) => { - manager.manage_resp_error(e)?; - } - }, - #[cfg(feature = "tokio_base")] - Poll::Ready(None) => { - manager.exit_with_error(DispatchErrorKind::ChannelClosed); - return Poll::Ready(Ok(())); - } - #[cfg(feature = "ylong_base")] - Poll::Ready(Err(_e)) => { - manager.exit_with_error(DispatchErrorKind::ChannelClosed); - return Poll::Ready(Ok(())); } - - Poll::Pending => { - return manager.manage_pending_state(cx); + ManagerState::Receive => { + // Receives a response frame from io output. + match manager.resp_rx.poll_recv(cx) { + #[cfg(feature = "tokio_base")] + Poll::Ready(Some(message)) => match message { + OutputMessage::Output(frame) => { + if manager.poll_recv_message(cx, frame)?.is_pending() { + return Poll::Pending; + } + } + // io output occurs error. + OutputMessage::OutputExit(e) => { + // Note error returned immediately. + if manager.manage_resp_error(cx, e)?.is_pending() { + return Poll::Pending; + } + } + }, + #[cfg(feature = "ylong_base")] + Poll::Ready(Ok(message)) => match message { + OutputMessage::Output(frame) => { + if manager.poll_recv_message(cx, frame)?.is_pending() { + return Poll::Pending; + } + } + // io output occurs error. + OutputMessage::OutputExit(e) => { + if manager.manage_resp_error(cx, e)?.is_pending() { + return Poll::Pending; + } + } + }, + #[cfg(feature = "tokio_base")] + Poll::Ready(None) => { + return manager.poll_channel_closed_exit(cx); + } + #[cfg(feature = "ylong_base")] + Poll::Ready(Err(_e)) => { + return manager.poll_channel_closed_exit(cx); + } + + Poll::Pending => { + // TODO manage error state. + return manager.manage_pending_state(cx); + } + } } + ManagerState::Exit(e) => return Poll::Ready(Err(e)), } } } @@ -92,11 +119,13 @@ impl ConnManager { pub(crate) fn new( settings: Arc>, input_tx: UnboundedSender, - resp_rx: UnboundedReceiver, + resp_rx: BoundedReceiver, req_rx: UnboundedReceiver, controller: StreamController, ) -> Self { Self { + state: ManagerState::Receive, + next_state: ManagerState::Receive, settings, input_tx, resp_rx, @@ -110,7 +139,7 @@ impl ConnManager { cx: &mut Context<'_>, ) -> Poll> { // The manager previously accepted a GOAWAY Frame. - if let Some(code) = self.controller.go_away { + if let Some(code) = self.controller.recved_go_away { self.poll_deal_with_go_away(code)?; } self.controller.streams.window_update_conn(&self.input_tx)?; @@ -167,9 +196,11 @@ impl ConnManager { } fn poll_input_request(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchErrorKind> { - loop { - self.controller.streams.try_consume_pending_concurrency(); - match self.controller.streams.next_stream() { + self.controller.streams.try_consume_pending_concurrency(); + let size = self.controller.streams.pending_stream_num(); + let mut index = 0; + while index < size { + match self.controller.streams.next_pending_stream() { None => { break; } @@ -177,6 +208,7 @@ impl ConnManager { self.input_stream_frame(cx, id)?; } } + index += 1; } Ok(()) } @@ -243,7 +275,11 @@ impl ConnManager { .map_err(|_e| DispatchErrorKind::ChannelClosed) } - fn poll_recv_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { + fn poll_recv_frame( + &mut self, + cx: &mut Context<'_>, + frame: Frame, + ) -> Poll> { match frame.payload() { Payload::Settings(_settings) => { self.recv_settings_frame(frame)?; @@ -253,19 +289,21 @@ impl ConnManager { } Payload::PushPromise(_) => { // TODO The current settings_enable_push setting is fixed to false. - return Err(H2Error::ConnectionError(ErrorCode::ProtocolError).into()); + return Poll::Ready(Err( + H2Error::ConnectionError(ErrorCode::ProtocolError).into() + )); } Payload::Goaway(_go_away) => { - self.recv_go_away_frame(frame)?; + return self.recv_go_away_frame(cx, frame).map_err(Into::into); } Payload::RstStream(_reset) => { - self.recv_reset_frame(frame)?; + return self.recv_reset_frame(cx, frame).map_err(Into::into); } Payload::Headers(_headers) => { - self.recv_header_frame(frame)?; + return self.recv_header_frame(cx, frame).map_err(Into::into); } Payload::Data(_data) => { - self.recv_data_frame(frame)?; + return self.recv_data_frame(cx, frame).map_err(Into::into); } Payload::WindowUpdate(_windows) => { self.recv_window_frame(frame)?; @@ -273,7 +311,7 @@ impl ConnManager { // Priority is no longer recommended, so keep it compatible but not processed. Payload::Priority(_priority) => {} } - Ok(()) + Poll::Ready(Ok(())) } fn recv_settings_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { @@ -342,93 +380,116 @@ impl ConnManager { } } - fn recv_go_away_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { + fn recv_go_away_frame( + &mut self, + cx: &mut Context<'_>, + frame: Frame, + ) -> Poll> { let go_away = if let Payload::Goaway(goaway) = frame.payload() { goaway } else { // this will not happen forever. - return Ok(()); + return Poll::Ready(Ok(())); }; // Prevents the current connection from generating a new stream. self.controller.shutdown(); self.req_rx.close(); let last_stream_id = go_away.get_last_stream_id(); - let streams = self - .controller - .go_away_unsent_stream(last_stream_id as u32)?; + let streams = self.controller.get_unsent_streams(last_stream_id as u32)?; let error = H2Error::ConnectionError(ErrorCode::try_from(go_away.get_error_code())?); + + let mut blocked = false; for stream_id in streams { - self.controller - .send_message_to_stream(stream_id, RespMessage::OutputExit(error.clone().into())); + match self.controller.send_message_to_stream( + cx, + stream_id, + RespMessage::OutputExit(error.into()), + ) { + // ignore error when going away. + Poll::Ready(_) => {} + Poll::Pending => { + blocked = true; + } + } } // Exit after the allowed stream is complete. - self.controller.go_away = Some(go_away.get_error_code()); - Ok(()) + self.controller.recved_go_away = Some(go_away.get_error_code()); + if blocked { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } } - fn recv_reset_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { + fn recv_reset_frame( + &mut self, + cx: &mut Context<'_>, + frame: Frame, + ) -> Poll> { match self .controller .streams .recv_remote_reset(frame.stream_id() as u32) { - StreamEndState::OK => { - self.controller - .send_message_to_stream(frame.stream_id() as u32, RespMessage::Output(frame)); - } - StreamEndState::Err(e) => { - return Err(e.into()); - } - _ => {} + StreamEndState::OK => self.controller.send_message_to_stream( + cx, + frame.stream_id() as u32, + RespMessage::Output(frame), + ), + StreamEndState::Err(e) => Poll::Ready(Err(e)), + StreamEndState::Ignore => Poll::Ready(Ok(())), } - Ok(()) } - fn recv_header_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { + fn recv_header_frame( + &mut self, + cx: &mut Context<'_>, + frame: Frame, + ) -> Poll> { match self .controller .streams .recv_headers(frame.stream_id() as u32, frame.flags().is_end_stream()) { - FrameRecvState::OK => { - self.controller - .send_message_to_stream(frame.stream_id() as u32, RespMessage::Output(frame)); - } - FrameRecvState::Err(e) => { - return Err(e.into()); - } - _ => {} + FrameRecvState::OK => self.controller.send_message_to_stream( + cx, + frame.stream_id() as u32, + RespMessage::Output(frame), + ), + FrameRecvState::Err(e) => Poll::Ready(Err(e)), + FrameRecvState::Ignore => Poll::Ready(Ok(())), } - Ok(()) } - fn recv_data_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { + fn recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll> { let data = if let Payload::Data(data) = frame.payload() { data } else { // this will not happen forever. - return Ok(()); + return Poll::Ready(Ok(())); }; let id = frame.stream_id() as u32; let len = data.size() as u32; + + self.controller.streams.release_conn_recv_window(len)?; + self.controller + .streams + .release_stream_recv_window(id, len)?; + match self .controller .streams .recv_data(id, frame.flags().is_end_stream()) { - FrameRecvState::OK => { - self.controller - .send_message_to_stream(frame.stream_id() as u32, RespMessage::Output(frame)); - } - FrameRecvState::Ignore => {} - FrameRecvState::Err(e) => return Err(e.into()), + FrameRecvState::OK => self.controller.send_message_to_stream( + cx, + frame.stream_id() as u32, + RespMessage::Output(frame), + ), + FrameRecvState::Ignore => Poll::Ready(Ok(())), + FrameRecvState::Err(e) => Poll::Ready(Err(e)), } - self.controller.streams.release_conn_recv_window(len)?; - self.controller - .streams - .release_stream_recv_window(id, len)?; - Ok(()) } fn recv_window_frame(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { @@ -453,27 +514,35 @@ impl ConnManager { Ok(()) } - fn manage_resp_error(&mut self, kind: DispatchErrorKind) -> Result<(), DispatchErrorKind> { + fn manage_resp_error( + &mut self, + cx: &mut Context<'_>, + kind: DispatchErrorKind, + ) -> Poll> { match kind { - DispatchErrorKind::H2(h2) => { - match h2 { - H2Error::StreamError(id, code) => { - self.manage_stream_error(id, code)?; - } - H2Error::ConnectionError(code) => { - self.manage_conn_error(code)?; - } - } - Ok(()) - } + DispatchErrorKind::H2(h2) => match h2 { + H2Error::StreamError(id, code) => self.manage_stream_error(cx, id, code), + H2Error::ConnectionError(code) => self.manage_conn_error(cx, code), + }, other => { - self.exit_with_error(other.clone()); - Err(other) + let blocked = self.exit_with_error(cx, other); + if blocked { + self.state = ManagerState::Send; + self.next_state = ManagerState::Exit(other); + Poll::Pending + } else { + Poll::Ready(Err(other)) + } } } } - fn manage_stream_error(&mut self, id: u32, code: ErrorCode) -> Result<(), DispatchErrorKind> { + fn manage_stream_error( + &mut self, + cx: &mut Context<'_>, + id: u32, + code: ErrorCode, + ) -> Poll> { let rest_payload = RstStream::new(code.into_code()); let frame = Frame::new( id as usize, @@ -486,29 +555,42 @@ impl ConnManager { .send(frame) .map_err(|_e| DispatchErrorKind::ChannelClosed)?; - self.controller.send_message_to_stream( + match self.controller.send_message_to_stream( + cx, id, RespMessage::OutputExit(DispatchErrorKind::ChannelClosed), - ); + ) { + Poll::Ready(_) => { + // error at the stream level due to early exit of the coroutine in which the + // request is located, ignored to avoid manager coroutine exit. + Poll::Ready(Ok(())) + } + Poll::Pending => { + self.state = ManagerState::Send; + // stream error will not cause manager exit with error(exit state). Takes + // effect only if blocked. + self.next_state = ManagerState::Receive; + Poll::Pending + } + } } - StreamEndState::Ignore => {} + StreamEndState::Ignore => Poll::Ready(Ok(())), StreamEndState::Err(e) => { // This error will never happen. - return Err(e.into()); + Poll::Ready(Err(e.into())) } } - Ok(()) } - fn manage_conn_error(&mut self, code: ErrorCode) -> Result<(), DispatchErrorKind> { - self.exit_with_error(DispatchErrorKind::H2(H2Error::ConnectionError( - code.clone(), - ))); - - // last_stream_id is set to 0 to ensure that all streams are + fn manage_conn_error( + &mut self, + cx: &mut Context<'_>, + code: ErrorCode, + ) -> Poll> { + // last_stream_id is set to 0 to ensure that all pushed streams are // shutdown. let go_away_payload = Goaway::new( - code.clone().into_code(), + code.into_code(), self.controller.streams.latest_remote_id as usize, vec![], ); @@ -517,22 +599,32 @@ impl ConnManager { FrameFlags::empty(), Payload::Goaway(go_away_payload.clone()), ); + // Avoid sending the same GO_AWAY frame multiple times. if let Some(ref go_away) = self.controller.go_away_sync.going_away { if go_away.get_error_code() == go_away_payload.get_error_code() && go_away.get_last_stream_id() == go_away_payload.get_last_stream_id() { - return Ok(()); + return Poll::Ready(Ok(())); } } - // Avoid sending the same GO_AWAY frame multiple times. self.controller.go_away_sync.going_away = Some(go_away_payload); self.input_tx .send(frame) .map_err(|_e| DispatchErrorKind::ChannelClosed)?; - // TODO When the current client has an error, - // it always sends the GO_AWAY frame at the first time and exits directly. - // Should we consider letting part of the unfinished stream complete? - Err(H2Error::ConnectionError(code).into()) + + let blocked = + self.exit_with_error(cx, DispatchErrorKind::H2(H2Error::ConnectionError(code))); + + if blocked { + self.state = ManagerState::Send; + self.next_state = ManagerState::Exit(H2Error::ConnectionError(code).into()); + Poll::Pending + } else { + // TODO When current client has an error, + // it always sends the GO_AWAY frame at the first time and exits directly. + // Should we consider letting part of the unfinished stream complete? + Poll::Ready(Err(H2Error::ConnectionError(code).into())) + } } fn poll_deal_with_go_away(&mut self, error_code: u32) -> Result<(), DispatchErrorKind> { @@ -583,18 +675,71 @@ impl ConnManager { Ok(()) } - fn poll_recv_message(&mut self, frame: Frame) -> Result<(), DispatchErrorKind> { - if let Err(kind) = self.poll_recv_frame(frame) { - self.manage_resp_error(kind)?; + fn poll_recv_message( + &mut self, + cx: &mut Context<'_>, + frame: Frame, + ) -> Poll> { + match self.poll_recv_frame(cx, frame) { + Poll::Ready(Err(kind)) => self.manage_resp_error(cx, kind), + Poll::Pending => { + self.state = ManagerState::Send; + self.next_state = ManagerState::Receive; + Poll::Pending + } + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), } - Ok(()) } - pub(crate) fn exit_with_error(&mut self, error: DispatchErrorKind) { + fn poll_channel_closed_exit( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + if self.exit_with_error(cx, DispatchErrorKind::ChannelClosed) { + self.state = ManagerState::Send; + self.next_state = ManagerState::Exit(DispatchErrorKind::ChannelClosed); + Poll::Pending + } else { + Poll::Ready(Err(DispatchErrorKind::ChannelClosed)) + } + } + + fn poll_blocked_frames(&mut self, cx: &mut Context<'_>) -> Poll<()> { + match self.controller.poll_blocked_message(cx, &self.input_tx) { + Poll::Ready(_) => { + self.state = self.next_state; + // Reset state. + self.next_state = ManagerState::Receive; + Poll::Ready(()) + } + Poll::Pending => Poll::Pending, + } + } + + pub(crate) fn exit_with_error( + &mut self, + cx: &mut Context<'_>, + error: DispatchErrorKind, + ) -> bool { self.controller.shutdown(); self.req_rx.close(); - self.controller - .streams - .go_away_all_streams(&mut self.controller.senders, error); + self.controller.streams.clear_streams_states(); + + let ids = self.controller.streams.get_all_unclosed_streams(); + let mut blocked = false; + for stream_id in ids { + match self.controller.send_message_to_stream( + cx, + stream_id, + RespMessage::OutputExit(error), + ) { + // ignore error when going away. + Poll::Ready(_) => {} + Poll::Pending => { + blocked = true; + } + } + } + blocked } } diff --git a/ylong_http_client/src/util/h2/mod.rs b/ylong_http_client/src/util/h2/mod.rs index 50feae9..0916468 100644 --- a/ylong_http_client/src/util/h2/mod.rs +++ b/ylong_http_client/src/util/h2/mod.rs @@ -38,6 +38,6 @@ pub(crate) use input::SendData; pub(crate) use io::{split, Reader, Writer}; pub(crate) use manager::ConnManager; pub(crate) use output::RecvData; -pub(crate) use streams::{RequestWrapper, Streams}; +pub(crate) use streams::{H2StreamState, RequestWrapper, StreamEndState, Streams}; pub const MAX_FLOW_CONTROL_WINDOW: u32 = (1 << 31) - 1; diff --git a/ylong_http_client/src/util/h2/output.rs b/ylong_http_client/src/util/h2/output.rs index a6c31d2..366ae2f 100644 --- a/ylong_http_client/src/util/h2/output.rs +++ b/ylong_http_client/src/util/h2/output.rs @@ -19,19 +19,33 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use ylong_http::h2::{ - ErrorCode, Frame, FrameDecoder, FrameKind, Frames, H2Error, Payload, Setting, + ErrorCode, Frame, FrameDecoder, FrameKind, FramesIntoIter, H2Error, Payload, Setting, }; -use crate::runtime::{AsyncRead, ReadBuf, ReadHalf, UnboundedSender}; +use crate::runtime::{AsyncRead, BoundedSender, ReadBuf, ReadHalf, SendError}; use crate::util::dispatcher::http2::{ DispatchErrorKind, OutputMessage, SettingsState, SettingsSync, }; +pub(crate) type OutputSendFut = + Pin>> + Send + Sync>>; + +#[derive(Copy, Clone)] +enum DecodeState { + Read, + Send, + Exit(DispatchErrorKind), +} + pub(crate) struct RecvData { decoder: FrameDecoder, settings: Arc>, reader: ReadHalf, - resp_tx: UnboundedSender, + state: DecodeState, + next_state: DecodeState, + resp_tx: BoundedSender, + curr_message: Option, + pending_iter: Option, } impl Future for RecvData { @@ -48,72 +62,170 @@ impl RecvData { decoder: FrameDecoder, settings: Arc>, reader: ReadHalf, - resp_tx: UnboundedSender, + resp_tx: BoundedSender, ) -> Self { Self { decoder, settings, reader, + state: DecodeState::Read, + next_state: DecodeState::Read, resp_tx, + curr_message: None, + pending_iter: None, } } fn poll_read_frame(&mut self, cx: &mut Context<'_>) -> Poll> { let mut buf = [0u8; 1024]; loop { - let mut read_buf = ReadBuf::new(&mut buf); - match Pin::new(&mut self.reader).poll_read(cx, &mut read_buf) { - Poll::Ready(Err(e)) => { - self.transmit_error(DispatchErrorKind::Disconnect)?; - return Poll::Ready(Err(e.into())); + match self.state { + DecodeState::Read => { + let mut read_buf = ReadBuf::new(&mut buf); + match Pin::new(&mut self.reader).poll_read(cx, &mut read_buf) { + Poll::Ready(Err(e)) => { + return self.transmit_error(cx, e.into()); + } + Poll::Ready(Ok(())) => {} + Poll::Pending => { + return Poll::Pending; + } + } + let read = read_buf.filled().len(); + if read == 0 { + return self.transmit_error(cx, DispatchErrorKind::Disconnect); + } + + match self.decoder.decode(&buf[..read]) { + Ok(frames) => match self.poll_iterator_frames(cx, frames.into_iter()) { + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(e)) => { + return Poll::Ready(Err(e)); + } + Poll::Pending => { + self.next_state = DecodeState::Read; + } + }, + Err(e) => { + match self.transmit_message(cx, OutputMessage::OutputExit(e.into())) { + Poll::Ready(Err(_)) => { + return Poll::Ready(Err(DispatchErrorKind::ChannelClosed)) + } + Poll::Ready(Ok(_)) => {} + Poll::Pending => { + self.next_state = DecodeState::Read; + return Poll::Pending; + } + } + } + } } - Poll::Ready(Ok(())) => {} - Poll::Pending => { - return Poll::Pending; + DecodeState::Send => { + match self.poll_blocked_task(cx) { + Poll::Ready(Ok(_)) => { + self.state = self.next_state; + // Reset next state. + self.next_state = DecodeState::Read; + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, + } + } + DecodeState::Exit(e) => { + return Poll::Ready(Err(e)); } } - let read = read_buf.filled().len(); - if read == 0 { - self.transmit_error(DispatchErrorKind::Disconnect)?; - return Poll::Ready(Err(DispatchErrorKind::Disconnect)); - } + } + } - match self.decoder.decode(&buf[..read]) { - Ok(frames) => match self.transmit_frame(frames) { - Ok(_) => {} - Err(DispatchErrorKind::H2(e)) => { - self.transmit_error(e.into())?; - } - Err(e) => { - return Poll::Ready(Err(e)); - } - }, - Err(e) => { - self.transmit_error(e.into())?; + fn poll_blocked_task(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(mut task) = self.curr_message.take() { + match task.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(_)) => { + return Poll::Ready(Err(DispatchErrorKind::ChannelClosed)); + } + Poll::Pending => { + self.curr_message = Some(task); + return Poll::Pending; } } } + + if let Some(iter) = self.pending_iter.take() { + return self.poll_iterator_frames(cx, iter); + } + Poll::Ready(Ok(())) } - fn transmit_frame(&mut self, frames: Frames) -> Result<(), DispatchErrorKind> { - for kind in frames.into_iter() { + fn poll_iterator_frames( + &mut self, + cx: &mut Context<'_>, + mut iter: FramesIntoIter, + ) -> Poll> { + while let Some(kind) = iter.next() { match kind { FrameKind::Complete(frame) => { - self.update_settings(&frame)?; - self.resp_tx - .send(OutputMessage::Output(frame)) - .map_err(|_e| DispatchErrorKind::ChannelClosed)?; + // TODO Whether to continue processing the remaining frames after connection + // error occurs in the Settings frame. + let message = if let Err(e) = self.update_settings(&frame) { + OutputMessage::OutputExit(DispatchErrorKind::H2(e)) + } else { + OutputMessage::Output(frame) + }; + + match self.transmit_message(cx, message) { + Poll::Ready(Ok(_)) => {} + Poll::Ready(Err(e)) => { + return Poll::Ready(Err(e)); + } + Poll::Pending => { + self.pending_iter = Some(iter); + return Poll::Pending; + } + } } FrameKind::Partial => {} } } - Ok(()) + Poll::Ready(Ok(())) } - fn transmit_error(&self, err: DispatchErrorKind) -> Result<(), DispatchErrorKind> { - self.resp_tx - .send(OutputMessage::OutputExit(err)) - .map_err(|_e| DispatchErrorKind::ChannelClosed) + fn transmit_error( + &mut self, + cx: &mut Context<'_>, + exit_err: DispatchErrorKind, + ) -> Poll> { + match self.transmit_message(cx, OutputMessage::OutputExit(exit_err)) { + Poll::Ready(_) => Poll::Ready(Err(exit_err)), + Poll::Pending => { + self.next_state = DecodeState::Exit(exit_err); + Poll::Pending + } + } + } + + fn transmit_message( + &mut self, + cx: &mut Context<'_>, + message: OutputMessage, + ) -> Poll> { + let mut task = { + let sender = self.resp_tx.clone(); + let ft = async move { sender.send(message).await }; + Box::pin(ft) + }; + + match task.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + // The current coroutine sending the request exited prematurely. + Poll::Ready(Err(_)) => Poll::Ready(Err(DispatchErrorKind::ChannelClosed)), + Poll::Pending => { + self.state = DecodeState::Send; + self.curr_message = Some(task); + Poll::Pending + } + } } fn update_settings(&mut self, frame: &Frame) -> Result<(), H2Error> { diff --git a/ylong_http_client/src/util/h2/streams.rs b/ylong_http_client/src/util/h2/streams.rs index fad3f95..d80a340 100644 --- a/ylong_http_client/src/util/h2/streams.rs +++ b/ylong_http_client/src/util/h2/streams.rs @@ -20,12 +20,12 @@ use std::task::{Context, Poll}; use ylong_http::h2::{Data, ErrorCode, Frame, FrameFlags, H2Error, Payload}; use crate::runtime::UnboundedSender; -use crate::util::dispatcher::http2::{DispatchErrorKind, RespMessage}; +use crate::util::dispatcher::http2::DispatchErrorKind; use crate::util::h2::buffer::{FlowControl, RecvWindow, SendWindow}; use crate::util::h2::data_ref::BodyDataRef; -const INITIAL_MAX_SEND_STREAM_ID: u32 = u32::MAX >> 1; -const INITIAL_MAX_RECV_STREAM_ID: u32 = u32::MAX >> 1; +pub(crate) const INITIAL_MAX_SEND_STREAM_ID: u32 = u32::MAX >> 1; +pub(crate) const INITIAL_MAX_RECV_STREAM_ID: u32 = u32::MAX >> 1; const INITIAL_LATEST_REMOTE_ID: u32 = 0; const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 100; @@ -77,7 +77,7 @@ pub(crate) enum StreamEndState { // | recv R | closed | recv R | // `----------------------->| |<----------------------' // +--------+ -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] pub(crate) enum H2StreamState { Idle, // When response does not depend on request, @@ -97,7 +97,7 @@ pub(crate) enum H2StreamState { Closed(CloseReason), } -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] pub(crate) enum CloseReason { LocalRst, RemoteRst, @@ -106,7 +106,7 @@ pub(crate) enum CloseReason { EndStream, } -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] pub(crate) enum ActiveState { WaitHeaders, WaitData, @@ -128,7 +128,7 @@ pub(crate) struct RequestWrapper { pub(crate) struct Streams { // Records the received goaway last_stream_id. pub(crate) max_send_id: u32, - // Records the sent goaway last_stream_id. + // Records the send goaway last_stream_id. pub(crate) max_recv_id: u32, // Currently the client doesn't support push promise, so this value is always 0. pub(crate) latest_remote_id: u32, @@ -294,6 +294,10 @@ impl Streams { true } + pub(crate) fn stream_state(&self, id: u32) -> Option { + self.stream_map.get(&id).map(|stream| stream.state) + } + pub(crate) fn insert(&mut self, id: u32, request: RequestWrapper) { let send_window = SendWindow::new(self.stream_send_window_size as i32); let recv_window = RecvWindow::new(self.stream_recv_window_size as i32); @@ -310,10 +314,14 @@ impl Streams { self.pending_concurrency.push_back(id); } - pub(crate) fn next_stream(&mut self) -> Option { + pub(crate) fn next_pending_stream(&mut self) -> Option { self.pending_send.pop_front() } + pub(crate) fn pending_stream_num(&self) -> usize { + self.pending_send.len() + } + pub(crate) fn try_consume_pending_concurrency(&mut self) { while !self.reach_max_concurrency() { match self.pending_concurrency.pop_front() { @@ -489,6 +497,7 @@ impl Streams { for (id, unsent_stream) in self.stream_map.iter_mut() { if *id >= last_stream_id { match unsent_stream.state { + // TODO Whether the close state needs to be selected. H2StreamState::Closed(_) => {} H2StreamState::Idle => { unsent_stream.state = H2StreamState::Closed(CloseReason::RemoteGoAway); @@ -508,11 +517,8 @@ impl Streams { ids } - pub(crate) fn go_away_all_streams( - &mut self, - senders: &mut HashMap>, - error: DispatchErrorKind, - ) { + pub(crate) fn get_all_unclosed_streams(&mut self) -> Vec { + let mut ids = vec![]; for (id, stream) in self.stream_map.iter_mut() { match stream.state { H2StreamState::Closed(_) => {} @@ -520,12 +526,14 @@ impl Streams { stream.header = None; stream.data.clear(); stream.state = H2StreamState::Closed(CloseReason::LocalGoAway); - if let Some(sender) = senders.get_mut(id) { - sender.send(RespMessage::OutputExit(error.clone())).ok(); - } + ids.push(*id); } } } + ids + } + + pub(crate) fn clear_streams_states(&mut self) { self.window_updating_streams.clear(); self.pending_stream_window.clear(); self.pending_send.clear(); @@ -577,11 +585,11 @@ impl Streams { recv, } => { stream.state = if eos { - H2StreamState::LocalHalfClosed(recv.clone()) + H2StreamState::LocalHalfClosed(*recv) } else { H2StreamState::Open { send: ActiveState::WaitData, - recv: recv.clone(), + recv: *recv, } }; } @@ -610,7 +618,7 @@ impl Streams { recv, } => { if eos { - stream.state = H2StreamState::LocalHalfClosed(recv.clone()); + stream.state = H2StreamState::LocalHalfClosed(*recv); } } H2StreamState::RemoteHalfClosed(ActiveState::WaitData) => { @@ -698,7 +706,7 @@ impl Streams { recv: ActiveState::WaitData, } => { if eos { - stream.state = H2StreamState::RemoteHalfClosed(send.clone()); + stream.state = H2StreamState::RemoteHalfClosed(*send); } } H2StreamState::LocalHalfClosed(ActiveState::WaitData) => { -- Gitee From 9a0ed16a124951ffddd13b804d6af0b5b6272fde Mon Sep 17 00:00:00 2001 From: Tiga Ultraman Date: Mon, 29 Jul 2024 20:56:19 +0800 Subject: [PATCH 2/2] http2 hpack add huffman encode Signed-off-by: Tiga Ultraman --- ylong_http/src/h2/encoder.rs | 68 +++++++++---------- ylong_http/src/h2/hpack/encoder.rs | 20 ++++-- .../src/h2/hpack/representation/encoder.rs | 52 ++++++++++---- .../examples/async_https_outside.rs | 22 ++---- ylong_http_client/src/async_impl/client.rs | 23 +++++-- ylong_http_client/src/util/config/http.rs | 22 ++++-- ylong_http_client/src/util/dispatcher.rs | 30 ++++---- ylong_http_client/src/util/h2/manager.rs | 19 +++++- 8 files changed, 154 insertions(+), 102 deletions(-) diff --git a/ylong_http/src/h2/encoder.rs b/ylong_http/src/h2/encoder.rs index ee5c14d..adfbef3 100644 --- a/ylong_http/src/h2/encoder.rs +++ b/ylong_http/src/h2/encoder.rs @@ -21,6 +21,8 @@ use crate::h2::{Frame, Goaway, HpackEncoder, Settings}; // Frame_size_error/Protocol Error. const DEFAULT_MAX_FRAME_SIZE: usize = 16384; +const DEFAULT_HEADER_TABLE_SIZE: usize = 4096; + #[derive(Debug)] pub enum FrameEncoderErr { EncodingData, @@ -42,6 +44,7 @@ enum FrameEncoderState { EncodingHeadersPayload, // The state for encoding the padding octets for the HEADERS frame, if the PADDED flag is set. EncodingHeadersPadding, + // TODO compare to max_header_list_size // The state for encoding CONTINUATION frames if the header block exceeds the max_frame_size. EncodingContinuationFrames, // The final state, indicating that the HEADERS frame and any necessary CONTINUATION frames @@ -109,12 +112,12 @@ pub struct FrameEncoder { impl FrameEncoder { /// Constructs a new `FrameEncoder` with specified maximum frame size and /// maximum header list size. - pub fn new(max_frame_size: usize, max_header_list_size: usize) -> Self { + pub fn new(max_frame_size: usize, use_huffman: bool) -> Self { FrameEncoder { current_frame: None, max_frame_size, - max_header_list_size, - hpack_encoder: HpackEncoder::with_max_size(max_header_list_size), + max_header_list_size: usize::MAX, + hpack_encoder: HpackEncoder::new(DEFAULT_HEADER_TABLE_SIZE, use_huffman), state: FrameEncoderState::Idle, encoded_bytes: 0, data_offset: 0, @@ -343,9 +346,14 @@ impl FrameEncoder { } /// Sets the maximum header table size for the current encoder instance. + // TODO enable update header table size. pub fn update_header_table_size(&mut self, size: usize) { + self.hpack_encoder.update_max_dynamic_table_size(size) + } + + // TODO enable update max header list size. + pub(crate) fn update_max_header_list_size(&mut self, size: usize) { self.max_header_list_size = size; - self.hpack_encoder = HpackEncoder::with_max_size(self.max_header_list_size) } fn finish_current_frame(&mut self) { @@ -1328,7 +1336,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_data_frame_encoding() { - let mut encoder = FrameEncoder::new(4096, 4096); + let mut encoder = FrameEncoder::new(4096, false); let data_payload = b"hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh".to_vec(); let data_frame = Frame::new( @@ -1367,7 +1375,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_headers_frame_encoding() { - let mut frame_encoder = FrameEncoder::new(4096, 8190); + let mut frame_encoder = FrameEncoder::new(4096, false); let mut new_parts = Parts::new(); new_parts.pseudo.set_method(Some("GET".to_string())); @@ -1406,7 +1414,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_settings_frame_encoding() { - let mut encoder = FrameEncoder::new(4096, 4096); + let mut encoder = FrameEncoder::new(4096, false); let settings_payload = vec![ Setting::HeaderTableSize(4096), Setting::EnablePush(true), @@ -1473,7 +1481,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_ping_frame_encoding() { - let mut encoder = FrameEncoder::new(4096, 4096); + let mut encoder = FrameEncoder::new(4096, false); let ping_payload = [1, 2, 3, 4, 5, 6, 7, 8]; let ping_frame = Frame::new( @@ -1525,7 +1533,7 @@ mod ut_frame_encoder { /// 4. Checks whether the encoding results are correct. #[test] fn ut_continue_frame_encoding() { - let mut encoder = FrameEncoder::new(4096, 8190); + let mut encoder = FrameEncoder::new(4096, false); let mut new_parts = Parts::new(); new_parts.pseudo.set_method(Some("GET".to_string())); @@ -1587,7 +1595,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_rst_stream_frame_encoding() { - let mut frame_encoder = FrameEncoder::new(4096, 8190); + let mut frame_encoder = FrameEncoder::new(4096, false); let error_code = 12345678; let rst_stream_payload = Payload::RstStream(RstStream::new(error_code)); @@ -1632,7 +1640,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_window_update_frame_encoding() { - let mut frame_encoder = FrameEncoder::new(4096, 8190); + let mut frame_encoder = FrameEncoder::new(4096, false); let window_size_increment = 12345678; let window_update_payload = Payload::WindowUpdate(WindowUpdate::new(window_size_increment)); @@ -1677,7 +1685,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_priority_frame_encoding() { - let mut encoder = FrameEncoder::new(4096, 4096); + let mut encoder = FrameEncoder::new(4096, false); // Maximum value for a 31-bit integer let stream_dependency = 0x7FFFFFFF; let priority_payload = Priority::new(true, stream_dependency, 15); @@ -1732,7 +1740,7 @@ mod ut_frame_encoder { #[test] fn ut_goaway_frame_encoding() { // 1. Creates a `FrameEncoder`. - let mut encoder = FrameEncoder::new(4096, 4096); + let mut encoder = FrameEncoder::new(4096, false); // 2. Creates a `Frame` with `Payload::Goaway`. let last_stream_id = 1; @@ -1782,24 +1790,11 @@ mod ut_frame_encoder { /// 3. Checks whether the maximum frame size was updated correctly. #[test] fn ut_update_max_frame_size() { - let mut encoder = FrameEncoder::new(4096, 4096); + let mut encoder = FrameEncoder::new(4096, false); encoder.update_max_frame_size(8192); assert_eq!(encoder.max_frame_size, 8192); } - /// UT test cases for `FrameEncoder::update_header_table_size`. - /// - /// # Brief - /// 1. Creates a `FrameEncoder`. - /// 2. Updates the maximum header table size. - /// 3. Checks whether the maximum header table size was updated correctly. - #[test] - fn ut_update_header_table_size() { - let mut encoder = FrameEncoder::new(4096, 4096); - encoder.update_header_table_size(8192); - assert_eq!(encoder.max_header_list_size, 8192); - } - /// UT test cases for `FrameEncoder::update_setting`. /// /// # Brief @@ -1811,7 +1806,7 @@ mod ut_frame_encoder { /// 6. Checks whether the setting was updated correctly. #[test] fn ut_update_setting() { - let mut encoder = FrameEncoder::new(4096, 4096); + let mut encoder = FrameEncoder::new(4096, false); let settings_payload = vec![Setting::MaxFrameSize(4096)]; let settings = Settings::new(settings_payload); let settings_frame = Frame::new(0, FrameFlags::new(0), Payload::Settings(settings)); @@ -1838,7 +1833,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_encode_continuation_frames() { - let mut frame_encoder = FrameEncoder::new(4096, 8190); + let mut frame_encoder = FrameEncoder::new(4096, false); let mut new_parts = Parts::new(); assert!(new_parts.is_empty()); new_parts.pseudo.set_method(Some("GET".to_string())); @@ -1849,7 +1844,7 @@ mod ut_frame_encoder { .set_authority(Some("example.com".to_string())); let mut frame_flag = FrameFlags::empty(); - frame_flag.set_end_headers(false); + frame_flag.set_end_headers(true); frame_flag.set_end_stream(false); let frame = Frame::new( 1, @@ -1863,7 +1858,8 @@ mod ut_frame_encoder { assert!(frame_encoder.encode_continuation_frames(&mut buf).is_ok()); - let frame_flag = FrameFlags::empty(); + let mut frame_flag = FrameFlags::empty(); + frame_flag.set_end_headers(true); let frame = Frame::new( 1, frame_flag, @@ -1874,7 +1870,8 @@ mod ut_frame_encoder { frame_encoder.state = FrameEncoderState::EncodingContinuationFrames; assert!(frame_encoder.encode_continuation_frames(&mut buf).is_ok()); - let frame_flag = FrameFlags::empty(); + let mut frame_flag = FrameFlags::empty(); + frame_flag.set_end_headers(true); let frame = Frame::new(1, frame_flag, Payload::Ping(Ping::new([0; 8]))); frame_encoder.set_frame(frame).unwrap(); @@ -1892,10 +1889,11 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_encode_padding() { - let mut frame_encoder = FrameEncoder::new(4096, 8190); + let mut frame_encoder = FrameEncoder::new(4096, false); // Creates a padded data frame. let mut frame_flags = FrameFlags::empty(); + frame_flags.set_end_headers(true); frame_flags.set_padded(true); let data_payload = vec![0u8; 500]; let data_frame = Frame::new( @@ -1931,7 +1929,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_encode_small_data_frame() { - let mut encoder = FrameEncoder::new(100, 4096); + let mut encoder = FrameEncoder::new(100, false); let data_payload = vec![b'a'; 10]; let mut buf = [0u8; 10]; encode_small_frame(&mut encoder, &mut buf, data_payload.clone()); @@ -1971,7 +1969,7 @@ mod ut_frame_encoder { /// 5. Checks whether the result is correct. #[test] fn ut_encode_large_data_frame() { - let mut encoder = FrameEncoder::new(100, 4096); + let mut encoder = FrameEncoder::new(100, false); let data_payload = vec![b'a'; 1024]; let mut buf = [0u8; 10]; diff --git a/ylong_http/src/h2/hpack/encoder.rs b/ylong_http/src/h2/hpack/encoder.rs index 3b2a71d..f6f7088 100644 --- a/ylong_http/src/h2/hpack/encoder.rs +++ b/ylong_http/src/h2/hpack/encoder.rs @@ -22,17 +22,23 @@ use crate::h2::{Parts, PseudoHeaders}; pub(crate) struct HpackEncoder { table: DynamicTable, holder: ReprEncStateHolder, + use_huffman: bool, } impl HpackEncoder { - /// Create a `HpackEncoder` with the given max size. - pub(crate) fn with_max_size(max_size: usize) -> Self { + /// Create a `HpackEncoder` with the given max dynamic table size and + /// huffman usage. + pub(crate) fn new(max_size: usize, use_huffman: bool) -> Self { Self { table: DynamicTable::with_max_size(max_size), holder: ReprEncStateHolder::new(), + use_huffman, } } + // TODO enable update header_table_size + pub(crate) fn update_max_dynamic_table_size(&self, _max_size: usize) {} + /// Set the `Parts` to be encoded. pub(crate) fn set_parts(&mut self, parts: Parts) { self.holder.set_parts(parts) @@ -43,7 +49,7 @@ impl HpackEncoder { pub(crate) fn encode(&mut self, dst: &mut [u8]) -> usize { let mut encoder = ReprEncoder::new(&mut self.table); encoder.load(&mut self.holder); - let size = encoder.encode(dst); + let size = encoder.encode(dst, self.use_huffman); if size == dst.len() { encoder.save(&mut self.holder); } @@ -91,7 +97,7 @@ mod ut_hpack_encoder { fn rfc7541_test_cases() { // C.2.1. Literal Header Field with Indexing hpack_test_cases!( - HpackEncoder::with_max_size(4096), + HpackEncoder::new(4096, false), 26, "400a637573746f6d2d6b65790d637573746f6d2d686561646572", 55, { Header::Other(String::from("custom-key")), @@ -104,7 +110,7 @@ mod ut_hpack_encoder { // C.2.4. Indexed Header Field hpack_test_cases!( - HpackEncoder::with_max_size(4096), + HpackEncoder::new(4096, false), 1, "82", 0, { Header::Method, @@ -114,7 +120,7 @@ mod ut_hpack_encoder { // C.3. Request Examples without Huffman Coding { - let mut encoder = HpackEncoder::with_max_size(4096); + let mut encoder = HpackEncoder::new(4096, false); // C.3.1. First Request hpack_test_cases!( &mut encoder, @@ -172,7 +178,7 @@ mod ut_hpack_encoder { // C.5. Response Examples without Huffman Coding { - let mut encoder = HpackEncoder::with_max_size(256); + let mut encoder = HpackEncoder::new(256, false); // C.5.1. First Response hpack_test_cases!( &mut encoder, diff --git a/ylong_http/src/h2/hpack/representation/encoder.rs b/ylong_http/src/h2/hpack/representation/encoder.rs index cdd9fd2..fbff588 100644 --- a/ylong_http/src/h2/hpack/representation/encoder.rs +++ b/ylong_http/src/h2/hpack/representation/encoder.rs @@ -19,6 +19,7 @@ use crate::h2::hpack::representation::PrefixIndexMask; use crate::h2::hpack::table::{DynamicTable, Header, TableIndex, TableSearcher}; use crate::h2::{Parts, PseudoHeaders}; use crate::headers::HeadersIntoIter; +use crate::huffman::huffman_encode; /// Encoder implementation for decoding representation. The encode interface /// supports segmented writing. @@ -61,7 +62,7 @@ impl<'a> ReprEncoder<'a> { /// Decoding is complete only when `self.iter` and `self.state` are both /// `None`. It is recommended that users save the result to a /// `RecEncStateHolder` immediately after using the method. - pub(crate) fn encode(&mut self, dst: &mut [u8]) -> usize { + pub(crate) fn encode(&mut self, dst: &mut [u8], use_huffman: bool) -> usize { // If `dst` is empty, leave the state unchanged. if dst.is_empty() { return 0; @@ -92,13 +93,17 @@ impl<'a> ReprEncoder<'a> { Some(TableIndex::HeaderName(index)) => { // Update it to the dynamic table first, then decode it. self.table.update(h.clone(), v.clone()); - Indexing::new(index, v.into_bytes(), false).encode(&mut dst[cur..]) + Indexing::new(index, v.into_bytes(), use_huffman).encode(&mut dst[cur..]) } None => { // Update it to the dynamic table first, then decode it. self.table.update(h.clone(), v.clone()); - IndexingWithName::new(h.into_string().into_bytes(), v.into_bytes(), false) - .encode(&mut dst[cur..]) + IndexingWithName::new( + h.into_string().into_bytes(), + v.into_bytes(), + use_huffman, + ) + .encode(&mut dst[cur..]) } }; match result { @@ -426,8 +431,9 @@ impl IndexAndValue { } fn set_value(mut self, value: Vec, is_huffman: bool) -> Self { - self.value_length = Some(Integer::length(value.len(), is_huffman)); - self.value_octets = Some(Octets::new(value)); + let octets = Octets::new(value, is_huffman); + self.value_length = Some(Integer::length(octets.len(), is_huffman)); + self.value_octets = Some(octets); self } @@ -465,10 +471,12 @@ impl NameAndValue { } fn set_name_and_value(mut self, name: Vec, value: Vec, is_huffman: bool) -> Self { - self.name_length = Some(Integer::length(name.len(), is_huffman)); - self.name_octets = Some(Octets::new(name)); - self.value_length = Some(Integer::length(value.len(), is_huffman)); - self.value_octets = Some(Octets::new(value)); + let name_octets = Octets::new(name, is_huffman); + self.name_length = Some(Integer::length(name_octets.len(), is_huffman)); + self.name_octets = Some(name_octets); + let value_octets = Octets::new(value, is_huffman); + self.value_length = Some(Integer::length(value_octets.len(), is_huffman)); + self.value_octets = Some(value_octets); self } @@ -496,7 +504,7 @@ impl Integer { fn length(length: usize, is_huffman: bool) -> Self { Self { - int: IntegerEncoder::new(length, 0x7f, u8::from(is_huffman)), + int: IntegerEncoder::new(length, 0x7f, pre_mask(is_huffman)), } } @@ -520,8 +528,18 @@ pub(crate) struct Octets { } impl Octets { - fn new(src: Vec) -> Self { - Self { src, idx: 0 } + fn new(src: Vec, is_huffman: bool) -> Self { + if is_huffman { + let mut dst = Vec::with_capacity(src.len()); + huffman_encode(src.as_slice(), dst.as_mut()); + Self { src: dst, idx: 0 } + } else { + Self { src, idx: 0 } + } + } + + fn len(&self) -> usize { + self.src.len() } fn encode(mut self, dst: &mut [u8]) -> Result { @@ -549,6 +567,14 @@ impl Octets { } } +fn pre_mask(is_huffman: bool) -> u8 { + if is_huffman { + 0x80 + } else { + 0 + } +} + #[cfg(test)] mod ut_repre_encoder { use super::*; diff --git a/ylong_http_client/examples/async_https_outside.rs b/ylong_http_client/examples/async_https_outside.rs index 35dcb60..adca16c 100644 --- a/ylong_http_client/examples/async_https_outside.rs +++ b/ylong_http_client/examples/async_https_outside.rs @@ -14,43 +14,31 @@ //! This is a simple asynchronous HTTPS client example. use ylong_http_client::async_impl::{Body, Client, Downloader, Request}; -use ylong_http_client::{Certificate, HttpClientError, Redirect, TlsVersion}; +use ylong_http_client::{HttpClientError, Redirect, TlsVersion}; fn main() { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .expect("Tokio runtime build err."); - let mut v = vec![]; - for _i in 0..3 { - let handle = rt.spawn(req()); - v.push(handle); - } + let handle = rt.spawn(req()); rt.block_on(async move { - for h in v { - let _ = h.await; - } + let _ = handle.await.unwrap().unwrap(); }); } async fn req() -> Result<(), HttpClientError> { - let v = "some certs".as_bytes(); - let cert = Certificate::from_pem(v)?; - // Creates a `async_impl::Client` let client = Client::builder() .redirect(Redirect::default()) - .tls_built_in_root_certs(false) // not use root certs - .danger_accept_invalid_certs(true) // not verify certs - .max_tls_version(TlsVersion::TLS_1_2) .min_tls_version(TlsVersion::TLS_1_2) - .add_root_certificate(cert) .build()?; // Creates a `Request`. let request = Request::builder() - .url("https://www.example.com") + .header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0") + .url("http://vipspeedtest8.wuhan.net.cn:8080/download?size=1073741824") .body(Body::empty())?; // Sends request and receives a `Response`. diff --git a/ylong_http_client/src/async_impl/client.rs b/ylong_http_client/src/async_impl/client.rs index 658e3b3..d6eabc0 100644 --- a/ylong_http_client/src/async_impl/client.rs +++ b/ylong_http_client/src/async_impl/client.rs @@ -498,17 +498,32 @@ impl ClientBuilder { self } - /// Sets allowed max size of local cached frame. + /// Sets allowed max size of local cached frame, By default, 5 frames are + /// allowed per stream. /// /// # Examples /// /// ``` /// use ylong_http_client::async_impl::ClientBuilder; /// - /// let config = ClientBuilder::new().allow_cached_frame_num(10); + /// let config = ClientBuilder::new().allowed_cache_frame_size(10); /// ``` - pub fn allow_cached_frame_num(mut self, num: usize) -> Self { - self.http.http2_config.set_allow_cached_frame_num(num); + pub fn allowed_cache_frame_size(mut self, size: usize) -> Self { + self.http.http2_config.set_allowed_cache_frame_size(size); + self + } + + /// Sets whether to use huffman coding in hpack. The default is true. + /// + /// # Examples + /// + /// ``` + /// use ylong_http_client::async_impl::ClientBuilder; + /// + /// let config = ClientBuilder::new().use_huffman_coding(true); + /// ``` + pub fn use_huffman_coding(mut self, use_huffman: bool) -> Self { + self.http.http2_config.set_use_huffman_coding(use_huffman); self } diff --git a/ylong_http_client/src/util/config/http.rs b/ylong_http_client/src/util/config/http.rs index 2a237b6..7d0a723 100644 --- a/ylong_http_client/src/util/config/http.rs +++ b/ylong_http_client/src/util/config/http.rs @@ -75,7 +75,8 @@ pub(crate) mod http2 { init_conn_window_size: u32, init_stream_window_size: u32, enable_push: bool, - allow_cached_frame_num: usize, + allowed_cache_frame_size: usize, + use_huffman: bool, } impl H2Config { @@ -107,8 +108,12 @@ pub(crate) mod http2 { self.init_stream_window_size = size; } - pub(crate) fn set_allow_cached_frame_num(&mut self, num: usize) { - self.allow_cached_frame_num = num; + pub(crate) fn set_allowed_cache_frame_size(&mut self, size: usize) { + self.allowed_cache_frame_size = size; + } + + pub(crate) fn set_use_huffman_coding(&mut self, use_huffman: bool) { + self.use_huffman = use_huffman; } /// Gets the SETTINGS_MAX_FRAME_SIZE. @@ -138,8 +143,12 @@ pub(crate) mod http2 { self.init_stream_window_size } - pub(crate) fn allow_cached_frame_num(&self) -> usize { - self.allow_cached_frame_num + pub(crate) fn allowed_cache_frame_size(&self) -> usize { + self.allowed_cache_frame_size + } + + pub(crate) fn use_huffman_coding(&self) -> bool { + self.use_huffman } } @@ -152,7 +161,8 @@ pub(crate) mod http2 { init_conn_window_size: DEFAULT_CONN_WINDOW_SIZE, init_stream_window_size: DEFAULT_STREAM_WINDOW_SIZE, enable_push: false, - allow_cached_frame_num: 5, + allowed_cache_frame_size: 5, + use_huffman: true, } } } diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index ab5028d..82e3d72 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -176,7 +176,6 @@ pub(crate) mod http2 { const DEFAULT_MAX_STREAM_ID: u32 = u32::MAX >> 1; const DEFAULT_MAX_FRAME_SIZE: usize = 2 << 13; - const DEFAULT_MAX_HEADER_LIST_SIZE: usize = 16 << 20; const DEFAULT_WINDOW_SIZE: u32 = 65535; pub(crate) type ManagerSendFut = @@ -210,7 +209,7 @@ pub(crate) mod http2 { // threads according to HTTP2 syntax. pub(crate) struct Http2Dispatcher { pub(crate) next_stream_id: StreamId, - pub(crate) allow_cached_frames: usize, + pub(crate) allowed_cache: usize, pub(crate) sender: UnboundedSender, pub(crate) io_shutdown: Arc, pub(crate) handles: Vec>, @@ -305,18 +304,18 @@ pub(crate) mod http2 { let mut handles = Vec::with_capacity(3); if input_tx.send(settings).is_ok() { Self::launch( - config.allow_cached_frame_num(), + config.allowed_cache_frame_size(), + config.use_huffman_coding(), controller, + (input_tx, input_rx), req_rx, - input_tx, - input_rx, &mut handles, io, ); } Self { next_stream_id, - allow_cached_frames: config.allow_cached_frame_num(), + allowed_cache: config.allowed_cache_frame_size(), sender: req_tx, io_shutdown: shutdown_flag, handles, @@ -326,10 +325,10 @@ pub(crate) mod http2 { fn launch( allow_num: usize, + use_huffman: bool, controller: StreamController, + input_channel: (UnboundedSender, UnboundedReceiver), req_rx: UnboundedReceiver, - input_tx: UnboundedSender, - input_rx: UnboundedReceiver, handles: &mut Vec>, io: S, ) { @@ -340,9 +339,9 @@ pub(crate) mod http2 { let send = crate::runtime::spawn(async move { let mut writer = write; if async_send_preface(&mut writer).await.is_ok() { - let encoder = - FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); - let mut send = SendData::new(encoder, send_settings_sync, writer, input_rx); + let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, use_huffman); + let mut send = + SendData::new(encoder, send_settings_sync, writer, input_channel.1); let _ = Pin::new(&mut send).await; } }); @@ -358,7 +357,7 @@ pub(crate) mod http2 { let manager = crate::runtime::spawn(async move { let mut conn_manager = - ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller); + ConnManager::new(settings_sync, input_channel.0, resp_rx, req_rx, controller); let _ = Pin::new(&mut conn_manager).await; }); handles.push(manager); @@ -374,12 +373,7 @@ pub(crate) mod http2 { return None; } let sender = self.sender.clone(); - let handle = Http2Conn::new( - id, - self.allow_cached_frames, - self.io_shutdown.clone(), - sender, - ); + let handle = Http2Conn::new(id, self.allowed_cache, self.io_shutdown.clone(), sender); Some(handle) } diff --git a/ylong_http_client/src/util/h2/manager.rs b/ylong_http_client/src/util/h2/manager.rs index 4b06c63..1ee01f1 100644 --- a/ylong_http_client/src/util/h2/manager.rs +++ b/ylong_http_client/src/util/h2/manager.rs @@ -48,6 +48,12 @@ pub(crate) struct ConnManager { // channel receiver between manager and stream coroutine. req_rx: UnboundedReceiver, controller: StreamController, + handshakes: HandShakes, +} + +struct HandShakes { + local: bool, + peer: bool, } impl Future for ConnManager { @@ -131,6 +137,10 @@ impl ConnManager { resp_rx, req_rx, controller, + handshakes: HandShakes { + local: false, + peer: false, + }, } } @@ -147,7 +157,9 @@ impl ConnManager { .streams .window_update_streams(&self.input_tx)?; self.poll_recv_request(cx)?; - self.poll_input_request(cx)?; + if self.handshakes.local && self.handshakes.peer { + self.poll_input_request(cx)?; + } Poll::Pending } @@ -335,6 +347,7 @@ impl ConnManager { } } connection.settings = SettingsState::Synced; + self.handshakes.local = true; Ok(()) } else { for setting in settings.get_settings() { @@ -358,7 +371,9 @@ impl ConnManager { ); self.input_tx .send(new_settings) - .map_err(|_e| DispatchErrorKind::ChannelClosed) + .map_err(|_e| DispatchErrorKind::ChannelClosed)?; + self.handshakes.peer = true; + Ok(()) } } -- Gitee