diff --git a/ylong_http_client/src/util/c_openssl/error.rs b/ylong_http_client/src/util/c_openssl/error.rs index 23f5018503f8d8f688caa15191f7dc527e11bbe4..389a27055069fedb5134736d6685ff93e67ac7c3 100644 --- a/ylong_http_client/src/util/c_openssl/error.rs +++ b/ylong_http_client/src/util/c_openssl/error.rs @@ -399,7 +399,7 @@ mod ut_stack_error { #[cfg(feature = "c_openssl_3_0")] assert_eq!( msg, - "error:00000001lib: (0), :func(0)reason(1), :\"TEST\":1:Test" + "error:00000001lib: (0), :func(0)reason: (1), :\"TEST\":1:Test" ); } @@ -431,7 +431,7 @@ mod ut_stack_error { #[cfg(feature = "c_openssl_3_0")] assert_eq!( msg, - "error:00000001lib: (0), :func(0)reason(1), :\"TEST\":1:Test" + "error:00000001lib: (0), :func(0)reason: (1), :\"TEST\":1:Test" ); } diff --git a/ylong_http_client/src/util/h2/manager.rs b/ylong_http_client/src/util/h2/manager.rs index ff355d38a4b5e50e14fd93922058b3fa6744b1a6..7829179e255d6ed879d8774961cc338c4833898b 100644 --- a/ylong_http_client/src/util/h2/manager.rs +++ b/ylong_http_client/src/util/h2/manager.rs @@ -152,10 +152,6 @@ impl ConnManager { if let Some(code) = self.controller.recved_go_away { self.poll_deal_with_go_away(code)?; } - self.controller.streams.window_update_conn(&self.input_tx)?; - self.controller - .streams - .window_update_streams(&self.input_tx)?; self.poll_recv_request(cx)?; if self.handshakes.local && self.handshakes.peer { self.poll_input_request(cx)?; @@ -309,7 +305,7 @@ impl ConnManager { return self.recv_header_frame(cx, frame).map_err(Into::into); } Payload::Data(_data) => { - return self.recv_data_frame(cx, frame).map_err(Into::into); + return self.recv_data_frame(cx, frame); } Payload::WindowUpdate(_windows) => { self.recv_window_frame(frame)?; @@ -467,7 +463,11 @@ impl ConnManager { } } - fn recv_data_frame(&mut self, cx: &mut Context<'_>, frame: Frame) -> Poll> { + fn recv_data_frame( + &mut self, + cx: &mut Context<'_>, + frame: Frame, + ) -> Poll> { let data = if let Payload::Data(data) = frame.payload() { data } else { @@ -477,23 +477,19 @@ impl ConnManager { let id = frame.stream_id(); let len = data.size() as u32; - self.controller.streams.release_conn_recv_window(len)?; - self.controller - .streams - .release_stream_recv_window(id, len)?; + self.update_window(id, len)?; match self .controller .streams .recv_data(id, frame.flags().is_end_stream()) { - FrameRecvState::OK => self.controller.send_message_to_stream( - cx, - frame.stream_id(), - RespMessage::Output(frame), - ), + FrameRecvState::OK => self + .controller + .send_message_to_stream(cx, frame.stream_id(), RespMessage::Output(frame)) + .map_err(Into::into), FrameRecvState::Ignore => Poll::Ready(Ok(())), - FrameRecvState::Err(e) => Poll::Ready(Err(e)), + FrameRecvState::Err(e) => Poll::Ready(Err(e.into())), } } @@ -742,4 +738,18 @@ impl ConnManager { } blocked } + + pub(crate) fn update_window( + &mut self, + id: StreamId, + len: u32, + ) -> Result<(), DispatchErrorKind> { + self.controller + .streams + .release_conn_recv_window(len, &self.input_tx)?; + self.controller + .streams + .release_stream_recv_window(id, len, &self.input_tx)?; + Ok(()) + } } diff --git a/ylong_http_client/src/util/h2/streams.rs b/ylong_http_client/src/util/h2/streams.rs index c7cffb8d0617639818266777017f2d744c03ac53..7aff74622aa7c095607732364031e779de106d5b 100644 --- a/ylong_http_client/src/util/h2/streams.rs +++ b/ylong_http_client/src/util/h2/streams.rs @@ -274,24 +274,43 @@ impl Streams { &mut self, id: StreamId, size: u32, - ) -> Result<(), H2Error> { + sender: &UnboundedSender, + ) -> Result<(), DispatchErrorKind> { if let Some(stream) = self.stream_map.get_mut(&id) { if stream.recv_window.notification_available() < size { - return Err(H2Error::StreamError(id, ErrorCode::FlowControlError)); + return Err(H2Error::StreamError(id, ErrorCode::FlowControlError).into()); } stream.recv_window.recv_data(size); + // determine whether it is necessary to update the stream window if stream.recv_window.unreleased_size().is_some() { - self.window_updating_streams.push_back(id); + if !stream.is_init_or_active_flow_control() { + return Ok(()); + } + if let Some(window_update) = stream.recv_window.check_window_update(id) { + sender + .send(window_update) + .map_err(|_e| DispatchErrorKind::ChannelClosed)?; + } } } Ok(()) } - pub(crate) fn release_conn_recv_window(&mut self, size: u32) -> Result<(), H2Error> { + pub(crate) fn release_conn_recv_window( + &mut self, + size: u32, + sender: &UnboundedSender, + ) -> Result<(), DispatchErrorKind> { if self.flow_control.recv_notification_size_available() < size { - return Err(H2Error::ConnectionError(ErrorCode::FlowControlError)); + return Err(H2Error::ConnectionError(ErrorCode::FlowControlError).into()); } self.flow_control.recv_data(size); + // determine whether it is necessary to update the connection window + if let Some(window_update) = self.flow_control.check_conn_recv_window_update() { + sender + .send(window_update) + .map_err(|_e| DispatchErrorKind::ChannelClosed)?; + } Ok(()) } @@ -385,41 +404,6 @@ impl Streams { Ok(()) } - pub(crate) fn window_update_conn( - &mut self, - sender: &UnboundedSender, - ) -> Result<(), DispatchErrorKind> { - if let Some(window_update) = self.flow_control.check_conn_recv_window_update() { - sender - .send(window_update) - .map_err(|_e| DispatchErrorKind::ChannelClosed)?; - } - Ok(()) - } - - pub(crate) fn window_update_streams( - &mut self, - sender: &UnboundedSender, - ) -> Result<(), DispatchErrorKind> { - loop { - match self.window_updating_streams.pop_front() { - None => return Ok(()), - Some(id) => { - if let Some(stream) = self.stream_map.get_mut(&id) { - if !stream.is_init_or_active_flow_control() { - return Ok(()); - } - if let Some(window_update) = stream.recv_window.check_window_update(id) { - sender - .send(window_update) - .map_err(|_e| DispatchErrorKind::ChannelClosed)?; - } - } - } - } - } - } - pub(crate) fn headers(&mut self, id: StreamId) -> Result, H2Error> { match self.stream_map.get_mut(&id) { None => Err(H2Error::ConnectionError(ErrorCode::IntervalError)),