diff --git a/ylong_http_client/src/async_impl/conn/http2.rs b/ylong_http_client/src/async_impl/conn/http2.rs index 7399e62a3ef99a837de0cbaa6374fd7bc9629a17..08a06316d238497e7499e4bd1bf9b53b2f5a703e 100644 --- a/ylong_http_client/src/async_impl/conn/http2.rs +++ b/ylong_http_client/src/async_impl/conn/http2.rs @@ -53,7 +53,8 @@ where let part = message.request.ref_mut().part().clone(); // TODO Implement trailer. - let (flag, payload) = build_headers_payload(part, false) + let is_end_stream = message.request.ref_mut().body().is_empty(); + let (flag, payload) = build_headers_payload(part, is_end_stream) .map_err(|e| HttpClientError::from_error(ErrorKind::Request, e))?; let data = BodyDataRef::new(message.request.clone()); let stream = RequestWrapper { diff --git a/ylong_http_client/src/async_impl/request.rs b/ylong_http_client/src/async_impl/request.rs index 8ef446ffc12d4f0b2143374fc66ba8787417b4b9..383764756d7098f1c9af7e540c5ab75a9eeb9237 100644 --- a/ylong_http_client/src/async_impl/request.rs +++ b/ylong_http_client/src/async_impl/request.rs @@ -350,6 +350,10 @@ impl Body { Self { inner } } + pub(crate) fn is_empty(&self) -> bool { + matches!(self.inner, BodyKind::Empty) + } + pub(crate) async fn reuse(&mut self) -> std::io::Result<()> { match self.inner { BodyKind::Empty => Ok(()), @@ -447,12 +451,13 @@ mod ut_client_request { let builder = RequestBuilder::default().append_header("name", "value"); let request = builder.body(Body::empty()); assert!(request.is_ok()); + assert!(request.unwrap().body().is_empty()); let request = RequestBuilder::default() .append_header("name", "value") .url("http://") .body(Body::empty()); - assert!(request.is_err()) + assert!(request.is_err()); } /// UT test cases for `RequestBuilder::body`. diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index fb0da81504492ed0d81dae7974e4a90778e4e5b4..8e37775c421e2e64714b59082a2308f0238ca2ae 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -345,22 +345,28 @@ pub(crate) mod http2 { S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, { pub(crate) fn new(detail: ConnDetail, config: H2Config, io: S) -> Self { - let settings = create_initial_settings(&config); - let mut flow = FlowControl::new(DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE); flow.setup_recv_window(config.conn_window_size()); let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow); let shutdown_flag = Arc::new(AtomicBool::new(false)); - let controller = StreamController::new(streams, shutdown_flag.clone()); + let mut controller = StreamController::new(streams, shutdown_flag.clone()); let (input_tx, input_rx) = unbounded_channel(); let (req_tx, req_rx) = unbounded_channel(); + let settings = create_initial_settings(&config); + // Error is not possible, so it is not handled for the time // being. let mut handles = Vec::with_capacity(3); - if input_tx.send(settings).is_ok() { + // send initial settings and update conn recv window + if input_tx.send(settings).is_ok() + && controller + .streams + .release_conn_recv_window(0, &input_tx) + .is_ok() + { Self::launch( config.allowed_cache_frame_size(), config.use_huffman_coding(), diff --git a/ylong_http_client/src/util/h2/manager.rs b/ylong_http_client/src/util/h2/manager.rs index 05803b4ba9ce33b6cd43f2cf3fa750bc73b6bb46..311b4750fc36fefb134db272e88dc1da32d58f81 100644 --- a/ylong_http_client/src/util/h2/manager.rs +++ b/ylong_http_client/src/util/h2/manager.rs @@ -211,7 +211,12 @@ impl ConnManager { match self.controller.streams.headers(id)? { None => {} Some(header) => { + let is_end_stream = header.flags().is_end_stream(); self.poll_send_frame(header)?; + // Prevent sending empty data frames + if is_end_stream { + return Ok(()); + } } } @@ -341,13 +346,14 @@ impl ConnManager { // The reason for copying the payload is to pass information to the io input to // set the frame encoder, and the input will empty the // payload when it is sent - let new_settings = Frame::new( + let ack_settings = Frame::new( frame.stream_id(), FrameFlags::new(0x1), frame.payload().clone(), ); + self.input_tx - .send(new_settings) + .send(ack_settings) .map_err(|_e| DispatchErrorKind::ChannelClosed)?; Ok(()) }