diff --git a/ylong_http/src/h2/encoder.rs b/ylong_http/src/h2/encoder.rs index c02083014e6dbd38043129d7c53de2147cbc6911..4380a31ded3ac6e39dd07fb136dab7df206cc8dd 100644 --- a/ylong_http/src/h2/encoder.rs +++ b/ylong_http/src/h2/encoder.rs @@ -17,12 +17,15 @@ use crate::h2::{Frame, Goaway, HpackEncoder, Settings}; // TODO: Classify encoder errors per RFC specifications into categories like // stream or connection errors. Identify specific error types such as // Frame_size_error/Protocol Error. +const DEFAULT_MAX_FRAME_SIZE: usize = 16384; + #[derive(Debug)] pub enum FrameEncoderErr { EncodingData, UnexpectedPayloadType, NoCurrentFrame, InternalError, + HeadersNotEnd, } #[derive(PartialEq, Debug)] @@ -82,11 +85,16 @@ enum FrameEncoderState { /// also handles the fragmentation of frames. pub struct FrameEncoder { current_frame: Option, + // `max_frame_size` is actually useless in the Encoder for headers frame and continuation + // frame, because the frame length does not exceed the length of the + // `header_payload_buffer` max_frame_size: usize, max_header_list_size: usize, hpack_encoder: HpackEncoder, state: FrameEncoderState, encoded_bytes: usize, + // `remaining_header_payload` will always be smaller than the minimum max_frame_size, + // because the `header_payload_buffer` length is the minimum max_frame_size. remaining_header_payload: usize, remaining_payload_bytes: usize, is_end_stream: bool, @@ -110,15 +118,15 @@ impl FrameEncoder { remaining_payload_bytes: 0, is_end_stream: false, is_end_headers: false, - // Initialized to default max frame size. - header_payload_buffer: vec![0; 16383], + // Initialized to default max frame size(16384). + header_payload_buffer: vec![0; DEFAULT_MAX_FRAME_SIZE], header_payload_index: 0, } } /// Sets the current frame to be encoded by the `FrameEncoder`. The state of /// the encoder is updated based on the payload type of the frame. - pub fn set_frame(&mut self, frame: Frame) { + pub fn set_frame(&mut self, frame: Frame) -> Result<(), FrameEncoderErr> { self.is_end_stream = frame.flags().is_end_stream(); self.is_end_headers = frame.flags().is_end_headers(); self.current_frame = Some(frame); @@ -129,6 +137,9 @@ impl FrameEncoder { match &self.current_frame { Some(frame) => match frame.payload() { Payload::Headers(headers) => { + if !self.is_end_headers { + return Err(FrameEncoderErr::HeadersNotEnd); + } self.hpack_encoder.set_parts(headers.get_parts()); self.header_payload_index = 0; // TODO: Handle potential scenario where HPACK encoding may not be able to @@ -150,6 +161,7 @@ impl FrameEncoder { }, None => self.state = FrameEncoderState::Idle, } + Ok(()) } /// Encodes the current frame into the given buffer. The state of the @@ -326,6 +338,21 @@ impl FrameEncoder { self.hpack_encoder = HpackEncoder::with_max_size(self.max_header_list_size) } + fn finish_current_frame(&mut self) { + self.header_payload_index = 0; + self.is_end_stream = false; + self.current_frame = None; + self.is_end_headers = false; + self.remaining_header_payload = 0; + self.encoded_bytes = 0; + } + + fn read_rest_payload(&mut self) { + self.header_payload_index = 0; + let payload_size = self.hpack_encoder.encode(&mut self.header_payload_buffer); + self.remaining_header_payload = payload_size; + } + fn encode_headers_frame(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Headers(_) = frame.payload() { @@ -346,6 +373,7 @@ impl FrameEncoder { if self.encoded_bytes >= frame_header_size { payload_bytes_written = self .write_payload(&mut buf[bytes_written..], self.remaining_header_payload); + self.encoded_bytes += payload_bytes_written; self.headers_header_status(); } @@ -359,14 +387,15 @@ impl FrameEncoder { } fn headers_header_status(&mut self) { - if self.remaining_header_payload <= self.max_frame_size { - self.state = if self.is_end_stream { - FrameEncoderState::HeadersComplete - } else { - FrameEncoderState::EncodingHeadersPayload - }; + self.state = if self.header_payload_index < self.remaining_header_payload { + FrameEncoderState::EncodingHeadersPayload + } else if self.hpack_encoder.is_finished() { + // set_frame ensures that headers must be is_end_headers + self.finish_current_frame(); + FrameEncoderState::HeadersComplete } else { - self.state = FrameEncoderState::EncodingContinuationFrames; + self.read_rest_payload(); + FrameEncoderState::EncodingContinuationFrames } } @@ -390,7 +419,12 @@ impl FrameEncoder { } // The 5th byte represents the frame flags in the frame header. 4 => { - *item = frame.flags().bits(); + if self.hpack_encoder.is_finished() { + *item = frame.flags().bits(); + } else { + // If not finished, it is followed by a Continuation frame. + *item = frame.flags().bits() & 0xFB + } } // The last 4 bytes (6th to 9th) represent the stream identifier in the // frame header. @@ -409,17 +443,15 @@ impl FrameEncoder { fn encode_headers_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Headers(_) = frame.payload() { - let available_space = buf.len(); - if available_space == 0 { + if buf.is_empty() { return Ok(0); } let payload_bytes_written = self.write_payload(buf, self.remaining_header_payload); self.encoded_bytes += payload_bytes_written; - self.remaining_header_payload -= payload_bytes_written; // Updates the state based on the encoding progress - self.headers_payload_status(); + self.headers_header_status(); Ok(payload_bytes_written) } else { @@ -430,50 +462,26 @@ impl FrameEncoder { } } - fn headers_payload_status(&mut self) { - if self.hpack_encoder.is_finished() { - if self.remaining_header_payload <= self.max_frame_size { - self.state = if self.is_end_stream || self.is_end_headers { - FrameEncoderState::HeadersComplete - } else { - FrameEncoderState::EncodingContinuationFrames - }; - } else { - self.state = FrameEncoderState::EncodingContinuationFrames; - } - } else { - self.state = FrameEncoderState::EncodingHeadersPayload; - } - } - fn encode_continuation_frames(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Headers(_) = frame.payload() { - if self.remaining_header_payload == 0 { - self.state = FrameEncoderState::HeadersComplete; - return Ok(0); - } let available_space = buf.len(); let frame_header_size = 9; + // TODO allow available_space < 9 if available_space < frame_header_size { return Ok(0); } // Encodes CONTINUATION frame header. + // And this value is always the remaining_header_payload. let continuation_frame_len = self.remaining_header_payload.min(self.max_frame_size); for (buf_index, item) in buf.iter_mut().enumerate().take(3) { *item = ((continuation_frame_len >> (16 - (8 * buf_index))) & 0xFF) as u8; } buf[3] = FrameType::Continuation as u8; let mut new_flags = FrameFlags::empty(); - if self.remaining_header_payload <= self.max_frame_size { - if self.is_end_headers { - // Sets the END_HEADER flag on the last CONTINUATION frame. - new_flags.set_end_headers(true); - } - if self.is_end_stream { - // Sets the END_STREAM flag. - new_flags.set_end_stream(true); - } + if self.remaining_header_payload <= self.max_frame_size && self.hpack_encoder.is_finished() && self.is_end_headers { + // Sets the END_HEADER flag on the last CONTINUATION frame. + new_flags.set_end_headers(true); } buf[4] = new_flags.bits(); @@ -487,10 +495,9 @@ impl FrameEncoder { let payload_bytes_written = self.write_payload(&mut buf[frame_header_size..], continuation_frame_len); self.encoded_bytes += payload_bytes_written; - self.remaining_header_payload -= payload_bytes_written; // Updates the state based on the encoding progress. - self.update_continuation_state(); + self.headers_header_status(); Ok(frame_header_size + payload_bytes_written) } else { @@ -501,19 +508,6 @@ impl FrameEncoder { } } - fn update_continuation_state(&mut self) { - if self.hpack_encoder.is_finished() && self.remaining_header_payload <= self.max_frame_size - { - self.state = if self.is_end_stream || self.is_end_headers { - FrameEncoderState::HeadersComplete - } else { - FrameEncoderState::EncodingContinuationFrames - }; - } else { - self.state = FrameEncoderState::EncodingContinuationFrames; - } - } - fn encode_data_header(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Data(data_frame) = frame.payload() { @@ -1319,7 +1313,7 @@ mod ut_frame_encoder { Payload::Data(Data::new(data_payload.clone())), ); - encoder.set_frame(data_frame); + encoder.set_frame(data_frame).unwrap(); let mut first_buf = [0u8; 2]; let mut second_buf = [0u8; 38]; @@ -1364,7 +1358,7 @@ mod ut_frame_encoder { let frame = Frame::new(1, frame_flag, Payload::Headers(Headers::new(new_parts))); // Set the current frame for the encoder - frame_encoder.set_frame(frame); + frame_encoder.set_frame(frame).unwrap(); let mut buf = vec![0; 50]; let first_encoded = frame_encoder.encode(&mut buf).unwrap(); @@ -1405,7 +1399,7 @@ mod ut_frame_encoder { let mut first_buf = [0u8; 9]; let mut second_buf = [0u8; 30]; let mut third_buf = [0u8; 6]; - encoder.set_frame(settings_frame); + encoder.set_frame(settings_frame).unwrap(); let first_encoded = encoder.encode(&mut first_buf).unwrap(); assert_eq!(encoder.state, FrameEncoderState::EncodingSettingsPayload); @@ -1464,7 +1458,7 @@ mod ut_frame_encoder { Payload::Ping(Ping { data: ping_payload }), ); - encoder.set_frame(ping_frame); + encoder.set_frame(ping_frame).unwrap(); let mut first_buf = [0u8; 9]; let mut second_buf = [0u8; 8]; @@ -1538,15 +1532,15 @@ mod ut_frame_encoder { Payload::Headers(Headers::new(new_parts.clone())), ); - encoder.set_frame(frame_1); + encoder.set_frame(frame_1).unwrap(); let mut first_buf = [0u8; 50]; let first_encoding = encoder.encode(&mut first_buf).unwrap(); - encoder.set_frame(data_frame); + encoder.set_frame(data_frame).unwrap(); let mut second_buf = [0u8; 50]; let second_encoding = encoder.encode(&mut second_buf).unwrap(); - encoder.set_frame(frame_2); + encoder.set_frame(frame_2).unwrap(); let mut third_buf = [0u8; 50]; let third_encoding = encoder.encode(&mut third_buf).unwrap(); @@ -1583,7 +1577,7 @@ mod ut_frame_encoder { ); // Set the current frame for the encoder - frame_encoder.set_frame(frame); + frame_encoder.set_frame(frame).unwrap(); let mut buf = vec![0; 50]; let first_encoded = frame_encoder.encode(&mut buf).unwrap(); @@ -1628,7 +1622,7 @@ mod ut_frame_encoder { ); // Sets the current frame for the encoder. - frame_encoder.set_frame(frame); + frame_encoder.set_frame(frame).unwrap(); let mut buf = vec![0; 50]; let first_encoded = frame_encoder.encode(&mut buf).unwrap(); @@ -1667,7 +1661,7 @@ mod ut_frame_encoder { let priority_frame = Frame::new(131, FrameFlags::new(0), Payload::Priority(priority_payload)); - encoder.set_frame(priority_frame); + encoder.set_frame(priority_frame).unwrap(); let mut buf = [0u8; 14]; @@ -1727,7 +1721,7 @@ mod ut_frame_encoder { ); // 3. Sets the frame for the encoder. - encoder.set_frame(goaway_frame); + encoder.set_frame(goaway_frame).unwrap(); // 4. Encodes the frame and its payload using buffer segments. let mut first_buf = [0u8; 9]; @@ -1798,7 +1792,7 @@ mod ut_frame_encoder { let settings = Settings::new(settings_payload); let settings_frame = Frame::new(0, FrameFlags::new(0), Payload::Settings(settings)); - encoder.set_frame(settings_frame); + encoder.set_frame(settings_frame).unwrap(); let new_setting = Setting::MaxFrameSize(8192); encoder.update_setting(new_setting.clone()); @@ -1839,7 +1833,7 @@ mod ut_frame_encoder { Payload::Headers(Headers::new(new_parts.clone())), ); - frame_encoder.set_frame(frame); + frame_encoder.set_frame(frame).unwrap(); frame_encoder.state = FrameEncoderState::EncodingContinuationFrames; let mut buf = [0u8; 5000]; @@ -1852,14 +1846,14 @@ mod ut_frame_encoder { Payload::Headers(Headers::new(new_parts.clone())), ); - frame_encoder.set_frame(frame); + frame_encoder.set_frame(frame).unwrap(); frame_encoder.state = FrameEncoderState::EncodingContinuationFrames; assert!(frame_encoder.encode_continuation_frames(&mut buf).is_ok()); let frame_flag = FrameFlags::empty(); let frame = Frame::new(1, frame_flag, Payload::Ping(Ping::new([0; 8]))); - frame_encoder.set_frame(frame); + frame_encoder.set_frame(frame).unwrap(); frame_encoder.state = FrameEncoderState::EncodingContinuationFrames; assert!(frame_encoder.encode_continuation_frames(&mut buf).is_err()); } @@ -1887,14 +1881,14 @@ mod ut_frame_encoder { ); // Sets the frame to the frame_encoder and test padding encoding. - frame_encoder.set_frame(data_frame); + frame_encoder.set_frame(data_frame).unwrap(); frame_encoder.state = FrameEncoderState::EncodingDataPadding; let mut buf = [0u8; 600]; assert!(frame_encoder.encode_padding(&mut buf).is_ok()); let headers_payload = Payload::Headers(Headers::new(Parts::new())); let headers_frame = Frame::new(1, frame_flags.clone(), headers_payload); - frame_encoder.set_frame(headers_frame); + frame_encoder.set_frame(headers_frame).unwrap(); frame_encoder.state = FrameEncoderState::EncodingDataPadding; assert!(frame_encoder.encode_padding(&mut buf).is_err()); diff --git a/ylong_http_client/src/async_impl/conn/http2.rs b/ylong_http_client/src/async_impl/conn/http2.rs index a74a19a96d5bcaf19167271deb023ea2e3c1c937..23cb3ef004ec0c3db402da3c6dd52c14d0c1da28 100644 --- a/ylong_http_client/src/async_impl/conn/http2.rs +++ b/ylong_http_client/src/async_impl/conn/http2.rs @@ -430,4 +430,69 @@ mod ut_http2 { panic!("Unexpected frame type") } } + + /// UT for ensure that the response body(data frame) can read ends normally. + /// + /// # Brief + /// 1. Creates three data frames, one greater than buf, one less than buf, + /// and the last one equal to and finished with buf. + /// 2. The response body data is read from TextIo using a buf of 10 bytes. + /// 3. The body is all read, and the size is the same as the default. + /// 5. Checks that result. + #[cfg(feature = "ylong_base")] + #[test] + fn ut_http2_body_poll_read() { + use std::pin::Pin; + use std::sync::atomic::AtomicBool; + use std::sync::Arc; + + use ylong_http::h2::{Data, Frame, FrameFlags}; + use ylong_runtime::futures::poll_fn; + use ylong_runtime::io::{AsyncRead, ReadBuf}; + + use crate::async_impl::conn::http2::TextIo; + use crate::util::dispatcher::http2::Http2Conn; + + let (resp_tx, resp_rx) = crate::runtime::unbounded_channel(); + let (req_tx, _req_rx) = crate::runtime::unbounded_channel(); + let shutdown = Arc::new(AtomicBool::new(false)); + let mut conn: Http2Conn<()> = Http2Conn::new(1, shutdown, req_tx); + conn.receiver.set_receiver(resp_rx); + let mut text_io = TextIo::new(conn); + let data_1 = Frame::new( + 1, + FrameFlags::new(0), + Payload::Data(Data::new(vec![b'a'; 128])), + ); + let data_2 = Frame::new( + 1, + FrameFlags::new(0), + Payload::Data(Data::new(vec![b'a'; 2])), + ); + let data_3 = Frame::new( + 1, + FrameFlags::new(1), + Payload::Data(Data::new(vec![b'a'; 10])), + ); + let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_1)); + let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_2)); + let _ = resp_tx.send(crate::util::dispatcher::http2::RespMessage::Output(data_3)); + ylong_runtime::block_on(async { + let mut buf = [0_u8; 10]; + let mut output_vec = vec![]; + + let mut size = buf.len(); + // `output_vec < 1024` in order to be able to exit normally in case of an + // exception. + while size != 0 && output_vec.len() < 1024 { + let mut buffer = ReadBuf::new(buf.as_mut_slice()); + poll_fn(|cx| Pin::new(&mut text_io).poll_read(cx, &mut buffer)) + .await + .unwrap(); + size = buffer.filled_len(); + output_vec.extend_from_slice(&buf[..size]); + } + assert_eq!(output_vec.len(), 140); + }) + } } diff --git a/ylong_http_client/src/util/h2/input.rs b/ylong_http_client/src/util/h2/input.rs index 2ba09df94926b7c32ac9d22b230a9ce0c6d81697..395398ed5572f104cb707a63453180e302839416 100644 --- a/ylong_http_client/src/util/h2/input.rs +++ b/ylong_http_client/src/util/h2/input.rs @@ -95,12 +95,16 @@ impl Future for SendData { } else { frame }; - sender.encoder.set_frame(frame); + // This error will never happen. + sender.encoder.set_frame(frame).map_err(|_| { + DispatchErrorKind::H2(H2Error::ConnectionError(ErrorCode::IntervalError)) + })?; sender.state = InputState::WriteFrame; } InputState::WriteFrame => { match sender.poll_writer_frame(cx) { - Poll::Ready(_) => {} + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => return Poll::Pending, }; sender.state = InputState::RecvFrame;