From 2b635b0411d53886c3bc4dca6908e4eb0e0ffe14 Mon Sep 17 00:00:00 2001 From: Tiga Ultraman Date: Thu, 23 May 2024 12:15:25 +0800 Subject: [PATCH] codecheck 2nd Signed-off-by: Tiga Ultraman --- ylong_http/src/body/chunk.rs | 416 ++++------ ylong_http/src/body/mime/encode/part.rs | 39 +- ylong_http/src/body/mime/mimetype.rs | 45 +- ylong_http/src/h1/request/encoder.rs | 128 +-- ylong_http/src/h2/decoder.rs | 13 +- ylong_http/src/h2/encoder.rs | 742 ++++++++++-------- ylong_http/src/h2/hpack/table.rs | 106 +-- ylong_http/src/request/uri/mod.rs | 20 +- ylong_http_client/examples/async_http.rs | 2 +- .../src/async_impl/conn/http1.rs | 142 ++-- .../src/async_impl/conn/http2.rs | 16 +- ylong_http_client/src/async_impl/http_body.rs | 197 +++-- ylong_http_client/src/util/c_openssl/bio.rs | 50 +- .../src/util/c_openssl/ssl/ctx.rs | 42 +- ylong_http_client/src/util/dispatcher.rs | 94 +-- ylong_http_client/src/util/proxy.rs | 10 +- 16 files changed, 1099 insertions(+), 963 deletions(-) diff --git a/ylong_http/src/body/chunk.rs b/ylong_http/src/body/chunk.rs index cf7a62b..1268a58 100644 --- a/ylong_http/src/body/chunk.rs +++ b/ylong_http/src/body/chunk.rs @@ -167,6 +167,10 @@ impl ChunkBody> { } impl ChunkBody> { + fn chunk_encode(&mut self, dst: &mut [u8]) -> usize { + self.chunk_encode_reader(dst) + } + /// Creates a new `ChunkBody` by `async reader`. /// /// # Examples @@ -187,8 +191,30 @@ impl ChunkBody> { } } - fn chunk_encode(&mut self, dst: &mut [u8]) -> usize { - self.chunk_encode_reader(dst) + fn poll_partial( + &mut self, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if !self.encode_status.get_flag() { + let mut read_buf = ReadBuf::new(&mut self.chunk_data.chunk_buf); + + match Pin::new(&mut *self.from).poll_read(_cx, &mut read_buf) { + Poll::Ready(Ok(())) => { + let size = read_buf.filled().len(); + self.encode_status.set_flag(true); + // chunk idx reset zero + self.encode_status.set_chunk_idx(0); + self.chunk_data.chunk_last = size; + let data_size = self.chunk_encode(buf); + Poll::Ready(Ok(data_size)) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } else { + Poll::Ready(Ok(self.chunk_encode(buf))) + } } } @@ -274,27 +300,7 @@ impl async_impl::Body for ChunkBody { - if !chunk_body.encode_status.get_flag() { - let mut read_buf = ReadBuf::new(&mut chunk_body.chunk_data.chunk_buf); - - match Pin::new(&mut *chunk_body.from).poll_read(_cx, &mut read_buf) { - Poll::Ready(Ok(())) => { - let size = read_buf.filled().len(); - chunk_body.encode_status.set_flag(true); - // chunk idx reset zero - chunk_body.encode_status.set_chunk_idx(0); - chunk_body.chunk_data.chunk_last = size; - let data_size = chunk_body.chunk_encode(&mut buf[count..]); - Poll::Ready(data_size) - } - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - } - } else { - Poll::Ready(chunk_body.chunk_encode(&mut buf[count..])) - } - } + DataState::Partial => chunk_body.poll_partial(_cx, &mut buf[count..])?, DataState::Complete => Poll::Ready(chunk_body.trailer_encode(&mut buf[count..])), DataState::Finish => { return Poll::Ready(Ok(count)); @@ -972,37 +978,52 @@ impl ChunkBodyDecoder { } remains = rest; - match (chunk.is_complete(), self.is_last_chunk) { - (false, _) => { - if self.is_chunk_trailer - && (chunk.state == ChunkState::Data || chunk.state == ChunkState::DataCrlf) - { - results.push(chunk); - self.chunk_num += 1; - if remains.is_empty() { - break; - } - } else { - results.push(chunk); - break; - } - } - (true, true) => { - results.push(chunk); - self.is_last_chunk = false; - self.chunk_num = 0; - break; - } - (true, false) => { + if self + .match_decode_result(chunk, &mut results, remains) + .is_some() + { + break; + } + } + Ok((results, remains)) + } + + fn match_decode_result<'b, 'a: 'b>( + &mut self, + chunk: Chunk<'a>, + results: &mut Chunks<'b>, + remains: &[u8], + ) -> Option<()> { + match (chunk.is_complete(), self.is_last_chunk) { + (false, _) => { + if self.is_chunk_trailer + && (chunk.state == ChunkState::Data || chunk.state == ChunkState::DataCrlf) + { results.push(chunk); self.chunk_num += 1; if remains.is_empty() { - break; + return Some(()); } + } else { + results.push(chunk); + return Some(()); + } + } + (true, true) => { + results.push(chunk); + self.is_last_chunk = false; + self.chunk_num = 0; + return Some(()); + } + (true, false) => { + results.push(chunk); + self.chunk_num += 1; + if remains.is_empty() { + return Some(()); } } } - Ok((results, remains)) + None } /// Get trailer headers. @@ -1071,7 +1092,10 @@ impl ChunkBodyDecoder { fn decode_size<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> { self.stage = Stage::Size; if buf.is_empty() { - return Ok((self.sized_chunk(&buf[..0]), buf)); + return Ok(( + Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaSize), + buf, + )); } self.chunk_flag = false; for (i, &b) in buf.iter().enumerate() { @@ -1114,17 +1138,25 @@ impl ChunkBodyDecoder { _ => return Err(ErrorKind::InvalidInput.into()), } } - Ok((self.sized_chunk(&buf[..0]), &buf[buf.len()..])) + Ok(( + Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaSize), + &buf[buf.len()..], + )) } - fn sized_chunk<'a>(&self, buf: &'a [u8]) -> Chunk<'a> { + fn sized_chunk<'a>( + data: &'a [u8], + trailer: Option<&'a [u8]>, + size: usize, + state: ChunkState, + ) -> Chunk<'a> { Chunk { id: 0, - state: ChunkState::MetaSize, - size: self.total_size, + state, + size, extension: ChunkExt::new(), - data: &buf[..0], - trailer: None, + data, + trailer, } } @@ -1153,69 +1185,55 @@ impl ChunkBodyDecoder { fn skip_extension<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> { self.stage = Stage::Extension; if self.is_chunk_trailer { - for (i, &b) in buf.iter().enumerate() { - match b { - b'\r' => { - if self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = true; - return self.skip_trailer_crlf(&buf[i + 1..]); - } - b'\n' => { - if !self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = false; + self.skip_trailer_ext(buf) + } else { + self.skip_chunk_ext(buf) + } + } - return self.skip_trailer_crlf(&buf[i..]); - } - _ => {} + fn skip_trailer_ext<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> { + for (i, &b) in buf.iter().enumerate() { + match b { + b'\r' => { + self.decode_cr()?; + return self.skip_trailer_crlf(&buf[i + 1..]); } + b'\n' => { + self.decode_lf()?; + return self.skip_trailer_crlf(&buf[i..]); + } + _ => {} } - Ok(( - Chunk { - id: 0, - state: ChunkState::MetaExt, - size: self.total_size, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: Some(&buf[..0]), - }, - &buf[buf.len()..], - )) - } else { - for (i, &b) in buf.iter().enumerate() { - match b { - b'\r' => { - if self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = true; - return self.skip_crlf(&buf[i + 1..]); - } - b'\n' => { - if !self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = false; - return self.skip_crlf(&buf[i..]); - } - _ => {} + } + Ok(( + Self::sized_chunk( + &buf[..0], + Some(&buf[..0]), + self.total_size, + ChunkState::MetaExt, + ), + &buf[buf.len()..], + )) + } + + fn skip_chunk_ext<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> { + for (i, &b) in buf.iter().enumerate() { + match b { + b'\r' => { + self.decode_cr()?; + return self.skip_crlf(&buf[i + 1..]); } + b'\n' => { + self.decode_lf()?; + return self.skip_crlf(&buf[i..]); + } + _ => {} } - Ok(( - Chunk { - id: 0, - state: ChunkState::MetaExt, - size: self.total_size, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: None, - }, - &buf[buf.len()..], - )) } + Ok(( + Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaExt), + &buf[buf.len()..], + )) } fn skip_crlf<'a>(&mut self, buf: &'a [u8]) -> Result<(Chunk<'a>, &'a [u8]), HttpError> { @@ -1223,32 +1241,17 @@ impl ChunkBodyDecoder { for (i, &b) in buf.iter().enumerate() { match b { b'\r' => { - if self.cr_meet { - // TODO Check whether the state machine needs to be reused after the parsing - // fails and whether the state machine status needs to be adjusted. - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = true; + self.decode_cr()?; } b'\n' => { - if !self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = false; + self.decode_lf()?; return self.decode_data(&buf[i + 1..]); } _ => return Err(ErrorKind::InvalidInput.into()), } } Ok(( - Chunk { - id: 0, - state: ChunkState::MetaCrlf, - size: self.total_size, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: None, - }, + Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::MetaCrlf), &buf[buf.len()..], )) } @@ -1258,16 +1261,10 @@ impl ChunkBodyDecoder { for (i, &b) in buf.iter().enumerate() { match b { b'\r' => { - if self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = true; + self.decode_cr()?; } b'\n' => { - if !self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = false; + self.decode_lf()?; self.is_trailer_crlf = true; return self.decode_trailer_data(&buf[i + 1..]); } @@ -1275,18 +1272,32 @@ impl ChunkBodyDecoder { } } Ok(( - Chunk { - id: 0, - state: ChunkState::MetaCrlf, - size: self.total_size, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: Some(&buf[..0]), - }, + Self::sized_chunk( + &buf[..0], + Some(&buf[..0]), + self.total_size, + ChunkState::MetaCrlf, + ), &buf[buf.len()..], )) } + fn decode_cr(&mut self) -> Result<(), HttpError> { + if self.cr_meet { + return Err(ErrorKind::InvalidInput.into()); + } + self.cr_meet = true; + Ok(()) + } + + fn decode_lf(&mut self) -> Result<(), HttpError> { + if !self.cr_meet { + return Err(ErrorKind::InvalidInput.into()); + } + self.cr_meet = false; + Ok(()) + } + fn decode_trailer_data<'a>( &mut self, buf: &'a [u8], @@ -1294,14 +1305,7 @@ impl ChunkBodyDecoder { self.stage = Stage::TrailerData; if buf.is_empty() { return Ok(( - Chunk { - id: 0, - state: ChunkState::Data, - size: 0, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: Some(&buf[..0]), - }, + Self::sized_chunk(&buf[..0], Some(&buf[..0]), 0, ChunkState::Data), &buf[buf.len()..], )); } @@ -1313,32 +1317,20 @@ impl ChunkBodyDecoder { for (i, &b) in buf.iter().enumerate() { match b { b'\r' => { - if self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = true; + self.decode_cr()?; return self.skip_trailer_last_crlf(&buf[..i], &buf[i + 1..]); } b'\n' => { - if !self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = false; + self.decode_lf()?; return self.skip_trailer_last_crlf(&buf[..i], &buf[i..]); } _ => {} } } self.is_trailer_crlf = false; + Ok(( - Chunk { - id: 0, - state: ChunkState::Data, - size: 0, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: Some(buf), - }, + Self::sized_chunk(&buf[..0], Some(buf), 0, ChunkState::Data), &buf[buf.len()..], )) } @@ -1347,14 +1339,7 @@ impl ChunkBodyDecoder { self.stage = Stage::Data; if buf.is_empty() { return Ok(( - Chunk { - id: 0, - state: ChunkState::Data, - size: self.total_size, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: None, - }, + Self::sized_chunk(&buf[..0], None, self.total_size, ChunkState::Data), &buf[buf.len()..], )); } @@ -1367,14 +1352,7 @@ impl ChunkBodyDecoder { } else { self.rest_size -= buf.len(); Ok(( - Chunk { - id: 0, - state: ChunkState::Data, - size: self.total_size, - extension: ChunkExt::new(), - data: buf, - trailer: None, - }, + Self::sized_chunk(buf, None, self.total_size, ChunkState::Data), &buf[buf.len()..], )) } @@ -1389,57 +1367,30 @@ impl ChunkBodyDecoder { for (i, &b) in buf.iter().enumerate() { match b { b'\r' => { - if self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = true; + self.decode_cr()?; } b'\n' => { - if !self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = false; + self.decode_lf()?; return if self.is_last_chunk { self.stage = Stage::TrailerEndCrlf; Ok(( - Chunk { - id: 0, - state: ChunkState::Finish, - size: 0, - extension: ChunkExt::new(), - data: &buf[..0], - trailer: Some(&buf[..0]), - }, + Self::sized_chunk(&buf[..0], Some(&buf[..0]), 0, ChunkState::Finish), &buf[i + 1..], )) } else { self.cr_meet = false; self.is_trailer_crlf = true; self.stage = Stage::TrailerData; - let complete_chunk = Chunk { - id: 0, - state: ChunkState::DataCrlf, - size: 0, - extension: ChunkExt::new(), - data: &data[..0], - trailer: Some(data), - }; + let complete_chunk = + Self::sized_chunk(&data[..0], Some(data), 0, ChunkState::DataCrlf); return Ok((complete_chunk, &buf[i + 1..])); }; } _ => return Err(ErrorKind::InvalidInput.into()), } } - Ok(( - Chunk { - id: 0, - state: ChunkState::DataCrlf, - size: 0, - extension: ChunkExt::new(), - data: &data[..0], - trailer: Some(data), - }, + Self::sized_chunk(&data[..0], Some(data), 0, ChunkState::DataCrlf), &buf[buf.len()..], )) } @@ -1453,25 +1404,13 @@ impl ChunkBodyDecoder { for (i, &b) in buf.iter().enumerate() { match b { b'\r' => { - if self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = true; + self.decode_cr()?; } b'\n' => { - if !self.cr_meet { - return Err(ErrorKind::InvalidInput.into()); - } - self.cr_meet = false; + self.decode_lf()?; self.stage = Stage::Size; - let complete_chunk = Chunk { - id: 0, - state: ChunkState::Finish, - size: self.total_size, - extension: ChunkExt::new(), - data, - trailer: None, - }; + let complete_chunk = + Self::sized_chunk(data, None, self.total_size, ChunkState::Finish); self.total_size = 0; return Ok((complete_chunk, &buf[i + 1..])); } @@ -1479,14 +1418,7 @@ impl ChunkBodyDecoder { } } Ok(( - Chunk { - id: 0, - state: ChunkState::DataCrlf, - size: self.total_size, - extension: ChunkExt::new(), - data, - trailer: None, - }, + Self::sized_chunk(data, None, self.total_size, ChunkState::DataCrlf), &buf[buf.len()..], )) } diff --git a/ylong_http/src/body/mime/encode/part.rs b/ylong_http/src/body/mime/encode/part.rs index ae0ee87..482a54b 100644 --- a/ylong_http/src/body/mime/encode/part.rs +++ b/ylong_http/src/body/mime/encode/part.rs @@ -16,6 +16,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use std::io::Read; +use crate::body::async_impl::Body; use crate::body::mime::common::{data_copy, SizeResult, TokenStatus}; use crate::body::mime::{EncodeHeaders, MixFrom, PartStatus}; use crate::body::{async_impl, sync_impl, MimePart}; @@ -199,20 +200,7 @@ impl async_impl::Body for MimePartEncoder<'_> { PartStatus::Start => Poll::Ready(self.start_encode()), PartStatus::Headers => Poll::Ready(self.headers_encode(&mut buf[count..])), PartStatus::Crlf => Poll::Ready(self.crlf_encode(&mut buf[count..])), - PartStatus::Body => match &mut self.body { - Some(body) => { - let poll_result = Pin::new(body).poll_data(cx, &mut buf[count..]); - if let Poll::Ready(Ok(0)) = poll_result { - // complete async read body - self.check_next(); - }; - poll_result - } - _ => { - self.check_next(); - Poll::Ready(Ok(0)) - } - }, + PartStatus::Body => self.poll_mime_body(cx, &mut buf[count..]), PartStatus::End => return Poll::Ready(Ok(count)), }; @@ -226,6 +214,29 @@ impl async_impl::Body for MimePartEncoder<'_> { } } +impl MimePartEncoder<'_> { + fn poll_mime_body( + &mut self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + match self.body { + Some(ref mut body) => { + let poll_result = Pin::new(body).poll_data(cx, buf); + if let Poll::Ready(Ok(0)) = poll_result { + // complete async read body + self.check_next(); + }; + poll_result + } + _ => { + self.check_next(); + Poll::Ready(Ok(0)) + } + } + } +} + #[cfg(test)] mod ut_mime_part_encoder { use crate::body::{async_impl, sync_impl, MimePart, MimePartEncoder}; diff --git a/ylong_http/src/body/mime/mimetype.rs b/ylong_http/src/body/mime/mimetype.rs index 0dfc983..2fbebdc 100644 --- a/ylong_http/src/body/mime/mimetype.rs +++ b/ylong_http/src/body/mime/mimetype.rs @@ -581,13 +581,7 @@ impl MimeTypeTag { } } b'a' => { - // application - if b[1..].eq_ignore_ascii_case(b"pplication") { - return Ok(Self::Application); - // audio - } else if b[1..].eq_ignore_ascii_case(b"udio") { - return Ok(Self::Audio); - } + return Self::mime_byte_a(b); } b'f' => { // font @@ -602,16 +596,7 @@ impl MimeTypeTag { } } b'm' => { - // message - if b[1..].eq_ignore_ascii_case(b"essage") { - return Ok(Self::Message); - // model - } else if b[1..].eq_ignore_ascii_case(b"odel") { - return Ok(Self::Model); - // multipart - } else if b[1..].eq_ignore_ascii_case(b"ultipart") { - return Ok(Self::Multipart); - } + return Self::mime_byte_m(b); } b't' => { // text @@ -630,6 +615,32 @@ impl MimeTypeTag { Err(ErrorKind::InvalidInput.into()) } + + fn mime_byte_a(b: &[u8]) -> Result { + // application + if b[1..].eq_ignore_ascii_case(b"pplication") { + Ok(Self::Application) + // audio + } else if b[1..].eq_ignore_ascii_case(b"udio") { + Ok(Self::Audio) + } else { + Err(ErrorKind::InvalidInput.into()) + } + } + fn mime_byte_m(b: &[u8]) -> Result { + // message + if b[1..].eq_ignore_ascii_case(b"essage") { + Ok(Self::Message) + // model + } else if b[1..].eq_ignore_ascii_case(b"odel") { + Ok(Self::Model) + // multipart + } else if b[1..].eq_ignore_ascii_case(b"ultipart") { + Ok(Self::Multipart) + } else { + Err(ErrorKind::InvalidInput.into()) + } + } } // From [RFC6838](http://tools.ietf.org/html/rfc6838#section-4.2): diff --git a/ylong_http/src/h1/request/encoder.rs b/ylong_http/src/h1/request/encoder.rs index 313b33f..a412c5f 100644 --- a/ylong_http/src/h1/request/encoder.rs +++ b/ylong_http/src/h1/request/encoder.rs @@ -532,70 +532,80 @@ impl EncodeHeader { fn encode(&mut self, buf: &mut [u8]) -> TokenResult { match self.status.take().unwrap() { - HeaderStatus::Name => { - let name = self.name.as_bytes(); - let mut task = WriteData::new(name, &mut self.name_idx, buf); - match task.write()? { - TokenStatus::Complete(size) => { - self.status = Some(HeaderStatus::Colon); - Ok(TokenStatus::Partial(size)) - } - TokenStatus::Partial(size) => { - self.status = Some(HeaderStatus::Name); - Ok(TokenStatus::Partial(size)) - } - } + HeaderStatus::Name => self.encode_name(buf), + HeaderStatus::Colon => self.encode_colon(buf), + HeaderStatus::Value => self.encode_value(buf), + HeaderStatus::Crlf(crlf) => self.encode_crlf(buf, crlf), + HeaderStatus::EmptyHeader => Ok(TokenStatus::Complete(0)), + } + } + + fn encode_name(&mut self, buf: &mut [u8]) -> TokenResult { + let name = self.name.as_bytes(); + let mut task = WriteData::new(name, &mut self.name_idx, buf); + match task.write()? { + TokenStatus::Complete(size) => { + self.status = Some(HeaderStatus::Colon); + Ok(TokenStatus::Partial(size)) } - HeaderStatus::Colon => { - let colon = ":".as_bytes(); - let mut task = WriteData::new(colon, &mut self.colon_idx, buf); - match task.write()? { - TokenStatus::Complete(size) => { - self.status = Some(HeaderStatus::Value); - Ok(TokenStatus::Partial(size)) - } - TokenStatus::Partial(size) => { - self.status = Some(HeaderStatus::Colon); - Ok(TokenStatus::Partial(size)) - } - } + TokenStatus::Partial(size) => { + self.status = Some(HeaderStatus::Name); + Ok(TokenStatus::Partial(size)) } - HeaderStatus::Value => { - let value = self.value.as_slice(); - let mut task = WriteData::new(value, &mut self.value_idx, buf); - match task.write()? { - TokenStatus::Complete(size) => { - let crlf = EncodeCrlf::new(); - self.status = Some(HeaderStatus::Crlf(crlf)); - Ok(TokenStatus::Partial(size)) - } - TokenStatus::Partial(size) => { - self.status = Some(HeaderStatus::Value); - Ok(TokenStatus::Partial(size)) - } - } + } + } + + fn encode_colon(&mut self, buf: &mut [u8]) -> TokenResult { + let colon = ":".as_bytes(); + let mut task = WriteData::new(colon, &mut self.colon_idx, buf); + match task.write()? { + TokenStatus::Complete(size) => { + self.status = Some(HeaderStatus::Value); + Ok(TokenStatus::Partial(size)) } - HeaderStatus::Crlf(mut crlf) => match crlf.encode(buf)? { - TokenStatus::Complete(size) => { - if let Some(iter) = self.inner.next() { - let (header_name, header_value) = iter; - self.status = Some(HeaderStatus::Name); - self.name = header_name; - self.value = header_value.to_string().unwrap().into_bytes(); - self.name_idx = 0; - self.colon_idx = 0; - self.value_idx = 0; - Ok(TokenStatus::Partial(size)) - } else { - Ok(TokenStatus::Complete(size)) - } - } - TokenStatus::Partial(size) => { - self.status = Some(HeaderStatus::Crlf(crlf)); + TokenStatus::Partial(size) => { + self.status = Some(HeaderStatus::Colon); + Ok(TokenStatus::Partial(size)) + } + } + } + + fn encode_value(&mut self, buf: &mut [u8]) -> TokenResult { + let value = self.value.as_slice(); + let mut task = WriteData::new(value, &mut self.value_idx, buf); + match task.write()? { + TokenStatus::Complete(size) => { + let crlf = EncodeCrlf::new(); + self.status = Some(HeaderStatus::Crlf(crlf)); + Ok(TokenStatus::Partial(size)) + } + TokenStatus::Partial(size) => { + self.status = Some(HeaderStatus::Value); + Ok(TokenStatus::Partial(size)) + } + } + } + + fn encode_crlf(&mut self, buf: &mut [u8], mut crlf: EncodeCrlf) -> TokenResult { + match crlf.encode(buf)? { + TokenStatus::Complete(size) => { + if let Some(iter) = self.inner.next() { + let (header_name, header_value) = iter; + self.status = Some(HeaderStatus::Name); + self.name = header_name; + self.value = header_value.to_string().unwrap().into_bytes(); + self.name_idx = 0; + self.colon_idx = 0; + self.value_idx = 0; Ok(TokenStatus::Partial(size)) + } else { + Ok(TokenStatus::Complete(size)) } - }, - HeaderStatus::EmptyHeader => Ok(TokenStatus::Complete(0)), + } + TokenStatus::Partial(size) => { + self.status = Some(HeaderStatus::Crlf(crlf)); + Ok(TokenStatus::Partial(size)) + } } } } diff --git a/ylong_http/src/h2/decoder.rs b/ylong_http/src/h2/decoder.rs index 23323ee..ca1bd72 100644 --- a/ylong_http/src/h2/decoder.rs +++ b/ylong_http/src/h2/decoder.rs @@ -755,6 +755,7 @@ impl FrameDecoder { if fragment_start_index > fragment_end_index { return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); } + self.hpack.hpack_decode(buf)?; let promised_stream_id = if is_padded { get_stream_id(&buf[1..5]) } else { @@ -763,9 +764,15 @@ impl FrameDecoder { if is_connection_frame(promised_stream_id as usize) { return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); } + self.push_promise_framing(end_headers, promised_stream_id) + } + + fn push_promise_framing( + &mut self, + end_headers: bool, + promised_stream_id: u32, + ) -> Result { if end_headers { - self.hpack - .hpack_decode(&buf[fragment_start_index..fragment_end_index])?; let headers = self.hpack.hpack_finish()?; let frame = Frame::new( self.header.stream_id, @@ -781,8 +788,6 @@ impl FrameDecoder { self.continuations.is_end_headers = false; self.continuations.stream_id = self.header.stream_id; self.continuations.promised_stream_id = promised_stream_id; - self.hpack - .hpack_decode(&buf[fragment_start_index..fragment_end_index])?; Ok(FrameKind::Partial) } } diff --git a/ylong_http/src/h2/encoder.rs b/ylong_http/src/h2/encoder.rs index a9257aa..c020830 100644 --- a/ylong_http/src/h2/encoder.rs +++ b/ylong_http/src/h2/encoder.rs @@ -11,8 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::h2::frame::{FrameFlags, FrameType, Payload, Setting}; -use crate::h2::{Frame, HpackEncoder}; +use crate::h2::frame::{FrameFlags, FrameType, Payload, Priority, Setting}; +use crate::h2::{Frame, Goaway, HpackEncoder, Settings}; // TODO: Classify encoder errors per RFC specifications into categories like // stream or connection errors. Identify specific error types such as @@ -338,34 +338,7 @@ impl FrameEncoder { }; let bytes_to_write = remaining_header_bytes.min(buf.len()); - for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let header_byte_index = self.encoded_bytes + buf_index; - match header_byte_index { - // The first 3 bytes represent the payload length in the frame header. - 0..=2 => { - let payload_len = self.remaining_header_payload; - *item = ((payload_len >> (16 - (8 * header_byte_index))) & 0xFF) as u8; - } - // The 4th byte represents the frame type in the frame header. - 3 => { - *item = FrameType::Headers as u8; - } - // The 5th byte represents the frame flags in the frame header. - 4 => { - *item = frame.flags().bits(); - } - // The last 4 bytes (6th to 9th) represent the stream identifier in the - // frame header. - 5..=8 => { - let stream_id_byte_index = header_byte_index - 5; - *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } - + self.iterate_headers_header(frame, buf, bytes_to_write)?; self.encoded_bytes += bytes_to_write; let bytes_written = bytes_to_write; let mut payload_bytes_written = 0; @@ -373,16 +346,7 @@ impl FrameEncoder { if self.encoded_bytes >= frame_header_size { payload_bytes_written = self .write_payload(&mut buf[bytes_written..], self.remaining_header_payload); - - if self.remaining_header_payload <= self.max_frame_size { - self.state = if self.is_end_stream { - FrameEncoderState::HeadersComplete - } else { - FrameEncoderState::EncodingHeadersPayload - }; - } else { - self.state = FrameEncoderState::EncodingContinuationFrames; - } + self.headers_header_status(); } Ok(bytes_written + payload_bytes_written) @@ -394,6 +358,54 @@ impl FrameEncoder { } } + fn headers_header_status(&mut self) { + if self.remaining_header_payload <= self.max_frame_size { + self.state = if self.is_end_stream { + FrameEncoderState::HeadersComplete + } else { + FrameEncoderState::EncodingHeadersPayload + }; + } else { + self.state = FrameEncoderState::EncodingContinuationFrames; + } + } + + fn iterate_headers_header( + &self, + frame: &Frame, + buf: &mut [u8], + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, item) in buf.iter_mut().enumerate().take(len) { + let header_byte_index = self.encoded_bytes + buf_index; + match header_byte_index { + // The first 3 bytes represent the payload length in the frame header. + 0..=2 => { + let payload_len = self.remaining_header_payload; + *item = ((payload_len >> (16 - (8 * header_byte_index))) & 0xFF) as u8; + } + // The 4th byte represents the frame type in the frame header. + 3 => { + *item = FrameType::Headers as u8; + } + // The 5th byte represents the frame flags in the frame header. + 4 => { + *item = frame.flags().bits(); + } + // The last 4 bytes (6th to 9th) represent the stream identifier in the + // frame header. + 5..=8 => { + let stream_id_byte_index = header_byte_index - 5; + *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_headers_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Headers(_) = frame.payload() { @@ -407,19 +419,7 @@ impl FrameEncoder { self.remaining_header_payload -= payload_bytes_written; // Updates the state based on the encoding progress - if self.hpack_encoder.is_finished() { - if self.remaining_header_payload <= self.max_frame_size { - self.state = if self.is_end_stream || self.is_end_headers { - FrameEncoderState::HeadersComplete - } else { - FrameEncoderState::EncodingContinuationFrames - }; - } else { - self.state = FrameEncoderState::EncodingContinuationFrames; - } - } else { - self.state = FrameEncoderState::EncodingHeadersPayload; - } + self.headers_payload_status(); Ok(payload_bytes_written) } else { @@ -430,6 +430,22 @@ impl FrameEncoder { } } + fn headers_payload_status(&mut self) { + if self.hpack_encoder.is_finished() { + if self.remaining_header_payload <= self.max_frame_size { + self.state = if self.is_end_stream || self.is_end_headers { + FrameEncoderState::HeadersComplete + } else { + FrameEncoderState::EncodingContinuationFrames + }; + } else { + self.state = FrameEncoderState::EncodingContinuationFrames; + } + } else { + self.state = FrameEncoderState::EncodingHeadersPayload; + } + } + fn encode_continuation_frames(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Headers(_) = frame.payload() { @@ -437,13 +453,11 @@ impl FrameEncoder { self.state = FrameEncoderState::HeadersComplete; return Ok(0); } - let available_space = buf.len(); let frame_header_size = 9; if available_space < frame_header_size { return Ok(0); } - // Encodes CONTINUATION frame header. let continuation_frame_len = self.remaining_header_payload.min(self.max_frame_size); for (buf_index, item) in buf.iter_mut().enumerate().take(3) { @@ -476,17 +490,7 @@ impl FrameEncoder { self.remaining_header_payload -= payload_bytes_written; // Updates the state based on the encoding progress. - if self.hpack_encoder.is_finished() - && self.remaining_header_payload <= self.max_frame_size - { - self.state = if self.is_end_stream || self.is_end_headers { - FrameEncoderState::HeadersComplete - } else { - FrameEncoderState::EncodingContinuationFrames - }; - } else { - self.state = FrameEncoderState::EncodingContinuationFrames; - } + self.update_continuation_state(); Ok(frame_header_size + payload_bytes_written) } else { @@ -497,6 +501,19 @@ impl FrameEncoder { } } + fn update_continuation_state(&mut self) { + if self.hpack_encoder.is_finished() && self.remaining_header_payload <= self.max_frame_size + { + self.state = if self.is_end_stream || self.is_end_headers { + FrameEncoderState::HeadersComplete + } else { + FrameEncoderState::EncodingContinuationFrames + }; + } else { + self.state = FrameEncoderState::EncodingContinuationFrames; + } + } + fn encode_data_header(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Data(data_frame) = frame.payload() { @@ -509,33 +526,7 @@ impl FrameEncoder { }; let bytes_to_write = remaining_header_bytes.min(buf.len()); - for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let header_byte_index = self.encoded_bytes + buf_index; - match header_byte_index { - // The first 3 bytes represent the payload length in the frame header. - 0..=2 => { - let payload_len = data_frame.data().len(); - *item = ((payload_len >> (16 - (8 * header_byte_index))) & 0xFF) as u8; - } - // The 4th byte represents the frame type in the frame header. - 3 => { - *item = frame.payload().frame_type() as u8; - } - // The 5th byte represents the frame flags in the frame header. - 4 => { - *item = frame.flags().bits(); - } - // The last 4 bytes (6th to 9th) represent the stream identifier in the - // frame header. - 5..=8 => { - let stream_id_byte_index = header_byte_index - 5; - *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } + self.iterate_data_header(frame, buf, data_frame.data().len(), bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == frame_header_size { @@ -551,6 +542,42 @@ impl FrameEncoder { } } + fn iterate_data_header( + &self, + frame: &Frame, + buf: &mut [u8], + payload_len: usize, + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, item) in buf.iter_mut().enumerate().take(len) { + let header_byte_index = self.encoded_bytes + buf_index; + match header_byte_index { + // The first 3 bytes represent the payload length in the frame header. + 0..=2 => { + *item = ((payload_len >> (16 - (8 * header_byte_index))) & 0xFF) as u8; + } + // The 4th byte represents the frame type in the frame header. + 3 => { + *item = frame.payload().frame_type() as u8; + } + // The 5th byte represents the frame flags in the frame header. + 4 => { + *item = frame.flags().bits(); + } + // The last 4 bytes (6th to 9th) represent the stream identifier in the + // frame header. + 5..=8 => { + let stream_id_byte_index = header_byte_index - 5; + *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_data_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = self.current_frame.as_ref() { if let Payload::Data(data_frame) = frame.payload() { @@ -622,33 +649,7 @@ impl FrameEncoder { frame_header_size - self.encoded_bytes }; let bytes_to_write = remaining_header_bytes.min(buf.len()); - for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let header_byte_index = self.encoded_bytes + buf_index; - match header_byte_index { - 0..=2 => { - if let Payload::Goaway(goaway_payload) = frame.payload() { - let payload_size = goaway_payload.encoded_len(); - *item = - ((payload_size >> (8 * (2 - header_byte_index))) & 0xFF) as u8; - } else { - return Err(FrameEncoderErr::UnexpectedPayloadType); - } - } - 3 => { - *item = FrameType::Goaway as u8; - } - 4 => { - *item = frame.flags().bits(); - } - 5..=8 => { - let stream_id_byte_index = header_byte_index - 5; - *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } + self.iterate_goaway_header(frame, buf, bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == frame_header_size { self.state = FrameEncoderState::EncodingGoawayPayload; @@ -662,6 +663,41 @@ impl FrameEncoder { } } + fn iterate_goaway_header( + &self, + frame: &Frame, + buf: &mut [u8], + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, item) in buf.iter_mut().enumerate().take(len) { + let header_byte_index = self.encoded_bytes + buf_index; + match header_byte_index { + 0..=2 => { + if let Payload::Goaway(goaway_payload) = frame.payload() { + let payload_size = goaway_payload.encoded_len(); + *item = ((payload_size >> (8 * (2 - header_byte_index))) & 0xFF) as u8; + } else { + return Err(FrameEncoderErr::UnexpectedPayloadType); + } + } + 3 => { + *item = FrameType::Goaway as u8; + } + 4 => { + *item = frame.flags().bits(); + } + 5..=8 => { + let stream_id_byte_index = header_byte_index - 5; + *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_goaway_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Goaway(goaway) = frame.payload() { @@ -669,31 +705,8 @@ impl FrameEncoder { let remaining_payload_bytes = payload_size.saturating_sub(self.encoded_bytes.saturating_sub(9)); let bytes_to_write = remaining_payload_bytes.min(buf.len()); - for (buf_index, buf_item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let payload_byte_index = self.encoded_bytes - 9 + buf_index; - match payload_byte_index { - 0..=3 => { - let last_stream_id_byte_index = payload_byte_index; - *buf_item = (goaway.get_last_stream_id() - >> (24 - (8 * last_stream_id_byte_index))) - as u8; - } - 4..=7 => { - let error_code_byte_index = payload_byte_index - 4; - *buf_item = (goaway.get_error_code() - >> (24 - (8 * error_code_byte_index))) - as u8; - } - _ => { - let debug_data_index = payload_byte_index - 8; - if debug_data_index < goaway.get_debug_data().len() { - *buf_item = goaway.get_debug_data()[debug_data_index]; - } else { - return Err(FrameEncoderErr::InternalError); - } - } - } - } + + self.iterate_goaway_payload(goaway, buf, bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == 9 + payload_size { self.state = FrameEncoderState::DataComplete; @@ -708,6 +721,39 @@ impl FrameEncoder { } } + fn iterate_goaway_payload( + &self, + goaway: &Goaway, + buf: &mut [u8], + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, buf_item) in buf.iter_mut().enumerate().take(len) { + let payload_byte_index = self.encoded_bytes - 9 + buf_index; + match payload_byte_index { + 0..=3 => { + let last_stream_id_byte_index = payload_byte_index; + *buf_item = (goaway.get_last_stream_id() + >> (24 - (8 * last_stream_id_byte_index))) + as u8; + } + 4..=7 => { + let error_code_byte_index = payload_byte_index - 4; + *buf_item = + (goaway.get_error_code() >> (24 - (8 * error_code_byte_index))) as u8; + } + _ => { + let debug_data_index = payload_byte_index - 8; + if debug_data_index < goaway.get_debug_data().len() { + *buf_item = goaway.get_debug_data()[debug_data_index]; + } else { + return Err(FrameEncoderErr::InternalError); + } + } + } + } + Ok(()) + } + fn encode_window_update_frame(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::WindowUpdate(_) = frame.payload() { @@ -719,37 +765,7 @@ impl FrameEncoder { frame_header_size - self.encoded_bytes }; let bytes_to_write = remaining_header_bytes.min(buf.len()); - for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let header_byte_index = self.encoded_bytes + buf_index; - match header_byte_index { - // The first 3 bytes represent the payload length in the frame header. For - // WindowUpdate frame, this is always 4 bytes. - 0..=1 => { - *item = 0; - } - 2 => { - // Window Update frame payload size is always 4 bytes. - *item = 4; - } - // The 4th byte represents the frame type in the frame header. - 3 => { - *item = FrameType::WindowUpdate as u8; - } - // The 5th byte represents the frame flags in the frame header. - 4 => { - *item = frame.flags().bits(); - } - // The last 4 bytes (6th to 9th) represent the stream identifier in the - // frame header. - 5..=8 => { - let stream_id_byte_index = header_byte_index - 5; - *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } + self.iterate_window_update_header(frame, buf, bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == frame_header_size { self.state = FrameEncoderState::EncodingWindowUpdatePayload; @@ -765,6 +781,46 @@ impl FrameEncoder { } } + fn iterate_window_update_header( + &self, + frame: &Frame, + buf: &mut [u8], + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, item) in buf.iter_mut().enumerate().take(len) { + let header_byte_index = self.encoded_bytes + buf_index; + match header_byte_index { + // The first 3 bytes represent the payload length in the frame header. For + // WindowUpdate frame, this is always 4 bytes. + 0..=1 => { + *item = 0; + } + 2 => { + // Window Update frame payload size is always 4 bytes. + *item = 4; + } + // The 4th byte represents the frame type in the frame header. + 3 => { + *item = FrameType::WindowUpdate as u8; + } + // The 5th byte represents the frame flags in the frame header. + 4 => { + *item = frame.flags().bits(); + } + // The last 4 bytes (6th to 9th) represent the stream identifier in the + // frame header. + 5..=8 => { + let stream_id_byte_index = header_byte_index - 5; + *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_window_update_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::WindowUpdate(window_update) = frame.payload() { @@ -805,34 +861,12 @@ impl FrameEncoder { frame_header_size - self.encoded_bytes }; let bytes_to_write = remaining_header_bytes.min(buf.len()); - for buf_index in 0..bytes_to_write { - let header_byte_index = self.encoded_bytes + buf_index; - match header_byte_index { - // The first 3 bytes represent the payload length in the frame header. - 0..=2 => { - let payload_len = settings.get_settings().len() * 6; - buf[buf_index] = ((payload_len >> (16 - (8 * buf_index))) & 0xFF) as u8; - } - // The 4th byte represents the frame type in the frame header. - 3 => { - buf[3] = FrameType::Settings as u8; - } - // The 5th byte represents the frame flags in the frame header. - 4 => { - buf[4] = frame.flags().bits(); - } - // The last 4 bytes (6th to 9th) represent the stream identifier in the - // frame header. For SETTINGS frames, this should - // always be 0. - 5..=8 => { - // Stream ID should be 0 for SETTINGS frames. - buf[buf_index] = 0; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } + self.iterate_settings_header( + frame, + buf, + settings.get_settings().len() * 6, + bytes_to_write, + )?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == frame_header_size { self.state = FrameEncoderState::EncodingSettingsPayload; @@ -846,6 +880,43 @@ impl FrameEncoder { } } + fn iterate_settings_header( + &self, + frame: &Frame, + buf: &mut [u8], + payload_len: usize, + len: usize, + ) -> Result<(), FrameEncoderErr> { + for buf_index in 0..len { + let header_byte_index = self.encoded_bytes + buf_index; + match header_byte_index { + // The first 3 bytes represent the payload length in the frame header. + 0..=2 => { + buf[buf_index] = ((payload_len >> (16 - (8 * buf_index))) & 0xFF) as u8; + } + // The 4th byte represents the frame type in the frame header. + 3 => { + buf[3] = FrameType::Settings as u8; + } + // The 5th byte represents the frame flags in the frame header. + 4 => { + buf[4] = frame.flags().bits(); + } + // The last 4 bytes (6th to 9th) represent the stream identifier in the + // frame header. For SETTINGS frames, this should + // always be 0. + 5..=8 => { + // Stream ID should be 0 for SETTINGS frames. + buf[buf_index] = 0; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_settings_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Settings(settings) = frame.payload() { @@ -853,36 +924,7 @@ impl FrameEncoder { let remaining_payload_bytes = settings_len.saturating_sub(self.encoded_bytes.saturating_sub(9)); let bytes_to_write = remaining_payload_bytes.min(buf.len()); - for (buf_index, buf_item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let payload_byte_index = self.encoded_bytes - 9 + buf_index; - let setting_index = payload_byte_index / 6; - let setting_byte_index = payload_byte_index % 6; - - if let Some(setting) = settings.get_settings().get(setting_index) { - let (id, value) = match setting { - Setting::HeaderTableSize(v) => (0x1, *v), - Setting::EnablePush(v) => (0x2, *v as u32), - Setting::MaxConcurrentStreams(v) => (0x3, *v), - Setting::InitialWindowSize(v) => (0x4, *v), - Setting::MaxFrameSize(v) => (0x5, *v), - Setting::MaxHeaderListSize(v) => (0x6, *v), - }; - match setting_byte_index { - 0..=1 => { - *buf_item = ((id >> (8 * (1 - setting_byte_index))) & 0xFF) as u8; - } - 2..=5 => { - let shift_amount = 8 * (3 - (setting_byte_index - 2)); - *buf_item = ((value >> shift_amount) & 0xFF) as u8; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } else { - return Err(FrameEncoderErr::InternalError); - } - } + self.iterate_settings_payload(settings, buf, bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == 9 + settings_len { self.state = FrameEncoderState::DataComplete; @@ -897,6 +939,45 @@ impl FrameEncoder { } } + fn iterate_settings_payload( + &self, + settings: &Settings, + buf: &mut [u8], + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, buf_item) in buf.iter_mut().enumerate().take(len) { + let payload_byte_index = self.encoded_bytes - 9 + buf_index; + let setting_index = payload_byte_index / 6; + let setting_byte_index = payload_byte_index % 6; + + if let Some(setting) = settings.get_settings().get(setting_index) { + let (id, value) = match setting { + Setting::HeaderTableSize(v) => (0x1, *v), + Setting::EnablePush(v) => (0x2, *v as u32), + Setting::MaxConcurrentStreams(v) => (0x3, *v), + Setting::InitialWindowSize(v) => (0x4, *v), + Setting::MaxFrameSize(v) => (0x5, *v), + Setting::MaxHeaderListSize(v) => (0x6, *v), + }; + match setting_byte_index { + 0..=1 => { + *buf_item = ((id >> (8 * (1 - setting_byte_index))) & 0xFF) as u8; + } + 2..=5 => { + let shift_amount = 8 * (3 - (setting_byte_index - 2)); + *buf_item = ((value >> shift_amount) & 0xFF) as u8; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } else { + return Err(FrameEncoderErr::InternalError); + } + } + Ok(()) + } + fn encode_priority_frame(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Priority(_) = frame.payload() { @@ -909,33 +990,7 @@ impl FrameEncoder { }; let bytes_to_write = remaining_header_bytes.min(buf.len()); - for (buf_index, item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let header_byte_index = self.encoded_bytes + buf_index; - match header_byte_index { - // The first 3 bytes represent the payload length in the frame header. - 0..=2 => { - let payload_len = 5; - *item = ((payload_len >> (16 - (8 * header_byte_index))) & 0xFF) as u8; - } - // The 4th byte represents the frame type in the frame header. - 3 => { - *item = frame.payload().frame_type() as u8; - } - // The 5th byte represents the frame flags in the frame header. - 4 => { - *item = frame.flags().bits(); - } - // The last 4 bytes (6th to 9th) represent the stream identifier in the - // frame header. - 5..=8 => { - let stream_id_byte_index = header_byte_index - 5; - *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } + self.iterate_priority_header(frame, buf, bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == frame_header_size { self.state = FrameEncoderState::EncodingPriorityPayload; @@ -949,6 +1004,42 @@ impl FrameEncoder { } } + fn iterate_priority_header( + &self, + frame: &Frame, + buf: &mut [u8], + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, item) in buf.iter_mut().enumerate().take(len) { + let header_byte_index = self.encoded_bytes + buf_index; + match header_byte_index { + // The first 3 bytes represent the payload length in the frame header. + 0..=2 => { + let payload_len = 5; + *item = ((payload_len >> (16 - (8 * header_byte_index))) & 0xFF) as u8; + } + // The 4th byte represents the frame type in the frame header. + 3 => { + *item = frame.payload().frame_type() as u8; + } + // The 5th byte represents the frame flags in the frame header. + 4 => { + *item = frame.flags().bits(); + } + // The last 4 bytes (6th to 9th) represent the stream identifier in the + // frame header. + 5..=8 => { + let stream_id_byte_index = header_byte_index - 5; + *item = (frame.stream_id() >> (24 - (8 * stream_id_byte_index))) as u8; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_priority_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Priority(priority) = frame.payload() { @@ -957,31 +1048,7 @@ impl FrameEncoder { let remaining_payload_bytes = 5 - (self.encoded_bytes - frame_header_size); let bytes_to_write = remaining_payload_bytes.min(buf.len()); - for (buf_index, buf_item) in buf.iter_mut().enumerate().take(bytes_to_write) { - let payload_byte_index = self - .encoded_bytes - .saturating_sub(frame_header_size) - .saturating_add(buf_index); - match payload_byte_index { - 0 => { - *buf_item = (priority.get_exclusive() as u8) << 7 - | ((priority.get_stream_dependency() >> 24) & 0x7F) as u8; - } - 1..=3 => { - let stream_dependency_byte_index = payload_byte_index - 1; - *buf_item = (priority.get_stream_dependency() - >> (16 - (8 * stream_dependency_byte_index))) - as u8; - } - 4 => { - // The last byte is the weight. - *buf_item = priority.get_weight(); - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } + self.iterate_priority_payload(priority, buf, frame_header_size, bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == frame_header_size + 5 { self.state = FrameEncoderState::DataComplete @@ -996,6 +1063,41 @@ impl FrameEncoder { } } + fn iterate_priority_payload( + &self, + priority: &Priority, + buf: &mut [u8], + frame_header_size: usize, + len: usize, + ) -> Result<(), FrameEncoderErr> { + for (buf_index, buf_item) in buf.iter_mut().enumerate().take(len) { + let payload_byte_index = self + .encoded_bytes + .saturating_sub(frame_header_size) + .saturating_add(buf_index); + match payload_byte_index { + 0 => { + *buf_item = (priority.get_exclusive() as u8) << 7 + | ((priority.get_stream_dependency() >> 24) & 0x7F) as u8; + } + 1..=3 => { + let stream_dependency_byte_index = payload_byte_index - 1; + *buf_item = (priority.get_stream_dependency() + >> (16 - (8 * stream_dependency_byte_index))) + as u8; + } + 4 => { + // The last byte is the weight. + *buf_item = priority.get_weight(); + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_rst_stream_frame(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { let frame_header_size = 9; @@ -1087,35 +1189,7 @@ impl FrameEncoder { frame_header_size - self.encoded_bytes }; let bytes_to_write = remaining_header_bytes.min(buf.len()); - for buf_index in 0..bytes_to_write { - let header_byte_index = self.encoded_bytes + buf_index; - match header_byte_index { - // The first 3 bytes represent the payload length in the frame header. - 0..=2 => { - // PING payload is always 8 bytes. - let payload_len = 8; - buf[buf_index] = ((payload_len >> (16 - (8 * buf_index))) & 0xFF) as u8; - } - // The 4th byte represents the frame type in the frame header. - 3 => { - buf[3] = FrameType::Ping as u8; - } - // The 5th byte represents the frame flags in the frame header. - 4 => { - buf[4] = frame.flags().bits(); - } - // The last 4 bytes (6th to 9th) represent the stream identifier in the - // frame header. For PING frames, this should always - // be 0. - 5..=8 => { - // Stream ID should be 0 for PING frames. - buf[buf_index] = 0; - } - _ => { - return Err(FrameEncoderErr::InternalError); - } - } - } + self.iterate_ping_header(frame, buf, bytes_to_write)?; self.encoded_bytes += bytes_to_write; if self.encoded_bytes == frame_header_size { self.state = FrameEncoderState::EncodingPingPayload; @@ -1129,6 +1203,44 @@ impl FrameEncoder { } } + fn iterate_ping_header( + &self, + frame: &Frame, + buf: &mut [u8], + len: usize, + ) -> Result<(), FrameEncoderErr> { + for buf_index in 0..len { + let header_byte_index = self.encoded_bytes + buf_index; + match header_byte_index { + // The first 3 bytes represent the payload length in the frame header. + 0..=2 => { + // PING payload is always 8 bytes. + let payload_len = 8; + buf[buf_index] = ((payload_len >> (16 - (8 * buf_index))) & 0xFF) as u8; + } + // The 4th byte represents the frame type in the frame header. + 3 => { + buf[3] = FrameType::Ping as u8; + } + // The 5th byte represents the frame flags in the frame header. + 4 => { + buf[4] = frame.flags().bits(); + } + // The last 4 bytes (6th to 9th) represent the stream identifier in the + // frame header. For PING frames, this should always + // be 0. + 5..=8 => { + // Stream ID should be 0 for PING frames. + buf[buf_index] = 0; + } + _ => { + return Err(FrameEncoderErr::InternalError); + } + } + } + Ok(()) + } + fn encode_ping_payload(&mut self, buf: &mut [u8]) -> Result { if let Some(frame) = &self.current_frame { if let Payload::Ping(ping) = frame.payload() { diff --git a/ylong_http/src/h2/hpack/table.rs b/ylong_http/src/h2/hpack/table.rs index 7774c1d..be646c2 100644 --- a/ylong_http/src/h2/hpack/table.rs +++ b/ylong_http/src/h2/hpack/table.rs @@ -342,57 +342,61 @@ impl StaticTable { (Header::Status, "404") => Some(TableIndex::Header(13)), (Header::Status, "500") => Some(TableIndex::Header(14)), (Header::Status, _) => Some(TableIndex::HeaderName(8)), - (Header::Other(s), v) => match (s.as_str(), v) { - ("accept-charset", _) => Some(TableIndex::HeaderName(15)), - ("accept-encoding", "gzip, deflate") => Some(TableIndex::Header(16)), - ("accept-encoding", _) => Some(TableIndex::HeaderName(16)), - ("accept-language", _) => Some(TableIndex::HeaderName(17)), - ("accept-ranges", _) => Some(TableIndex::HeaderName(18)), - ("accept", _) => Some(TableIndex::HeaderName(19)), - ("access-control-allow-origin", _) => Some(TableIndex::HeaderName(20)), - ("age", _) => Some(TableIndex::HeaderName(21)), - ("allow", _) => Some(TableIndex::HeaderName(22)), - ("authorization", _) => Some(TableIndex::HeaderName(23)), - ("cache-control", _) => Some(TableIndex::HeaderName(24)), - ("content-disposition", _) => Some(TableIndex::HeaderName(25)), - ("content-encoding", _) => Some(TableIndex::HeaderName(26)), - ("content-language", _) => Some(TableIndex::HeaderName(27)), - ("content-length", _) => Some(TableIndex::HeaderName(28)), - ("content-location", _) => Some(TableIndex::HeaderName(29)), - ("content-range", _) => Some(TableIndex::HeaderName(30)), - ("content-type", _) => Some(TableIndex::HeaderName(31)), - ("cookie", _) => Some(TableIndex::HeaderName(32)), - ("date", _) => Some(TableIndex::HeaderName(33)), - ("etag", _) => Some(TableIndex::HeaderName(34)), - ("expect", _) => Some(TableIndex::HeaderName(35)), - ("expires", _) => Some(TableIndex::HeaderName(36)), - ("from", _) => Some(TableIndex::HeaderName(37)), - ("host", _) => Some(TableIndex::HeaderName(38)), - ("if-match", _) => Some(TableIndex::HeaderName(39)), - ("if-modified-since", _) => Some(TableIndex::HeaderName(40)), - ("if-none-match", _) => Some(TableIndex::HeaderName(41)), - ("if-range", _) => Some(TableIndex::HeaderName(42)), - ("if-unmodified-since", _) => Some(TableIndex::HeaderName(43)), - ("last-modified", _) => Some(TableIndex::HeaderName(44)), - ("link", _) => Some(TableIndex::HeaderName(45)), - ("location", _) => Some(TableIndex::HeaderName(46)), - ("max-forwards", _) => Some(TableIndex::HeaderName(47)), - ("proxy-authenticate", _) => Some(TableIndex::HeaderName(48)), - ("proxy-authorization", _) => Some(TableIndex::HeaderName(49)), - ("range", _) => Some(TableIndex::HeaderName(50)), - ("referer", _) => Some(TableIndex::HeaderName(51)), - ("refresh", _) => Some(TableIndex::HeaderName(52)), - ("retry-after", _) => Some(TableIndex::HeaderName(53)), - ("server", _) => Some(TableIndex::HeaderName(54)), - ("set-cookie", _) => Some(TableIndex::HeaderName(55)), - ("strict-transport-security", _) => Some(TableIndex::HeaderName(56)), - ("transfer-encoding", _) => Some(TableIndex::HeaderName(57)), - ("user-agent", _) => Some(TableIndex::HeaderName(58)), - ("vary", _) => Some(TableIndex::HeaderName(59)), - ("via", _) => Some(TableIndex::HeaderName(60)), - ("www-authenticate", _) => Some(TableIndex::HeaderName(61)), - _ => None, - }, + (Header::Other(s), v) => Self::index_headers(s.as_str(), v), + } + } + + fn index_headers(key: &str, value: &str) -> Option { + match (key, value) { + ("accept-charset", _) => Some(TableIndex::HeaderName(15)), + ("accept-encoding", "gzip, deflate") => Some(TableIndex::Header(16)), + ("accept-encoding", _) => Some(TableIndex::HeaderName(16)), + ("accept-language", _) => Some(TableIndex::HeaderName(17)), + ("accept-ranges", _) => Some(TableIndex::HeaderName(18)), + ("accept", _) => Some(TableIndex::HeaderName(19)), + ("access-control-allow-origin", _) => Some(TableIndex::HeaderName(20)), + ("age", _) => Some(TableIndex::HeaderName(21)), + ("allow", _) => Some(TableIndex::HeaderName(22)), + ("authorization", _) => Some(TableIndex::HeaderName(23)), + ("cache-control", _) => Some(TableIndex::HeaderName(24)), + ("content-disposition", _) => Some(TableIndex::HeaderName(25)), + ("content-encoding", _) => Some(TableIndex::HeaderName(26)), + ("content-language", _) => Some(TableIndex::HeaderName(27)), + ("content-length", _) => Some(TableIndex::HeaderName(28)), + ("content-location", _) => Some(TableIndex::HeaderName(29)), + ("content-range", _) => Some(TableIndex::HeaderName(30)), + ("content-type", _) => Some(TableIndex::HeaderName(31)), + ("cookie", _) => Some(TableIndex::HeaderName(32)), + ("date", _) => Some(TableIndex::HeaderName(33)), + ("etag", _) => Some(TableIndex::HeaderName(34)), + ("expect", _) => Some(TableIndex::HeaderName(35)), + ("expires", _) => Some(TableIndex::HeaderName(36)), + ("from", _) => Some(TableIndex::HeaderName(37)), + ("host", _) => Some(TableIndex::HeaderName(38)), + ("if-match", _) => Some(TableIndex::HeaderName(39)), + ("if-modified-since", _) => Some(TableIndex::HeaderName(40)), + ("if-none-match", _) => Some(TableIndex::HeaderName(41)), + ("if-range", _) => Some(TableIndex::HeaderName(42)), + ("if-unmodified-since", _) => Some(TableIndex::HeaderName(43)), + ("last-modified", _) => Some(TableIndex::HeaderName(44)), + ("link", _) => Some(TableIndex::HeaderName(45)), + ("location", _) => Some(TableIndex::HeaderName(46)), + ("max-forwards", _) => Some(TableIndex::HeaderName(47)), + ("proxy-authenticate", _) => Some(TableIndex::HeaderName(48)), + ("proxy-authorization", _) => Some(TableIndex::HeaderName(49)), + ("range", _) => Some(TableIndex::HeaderName(50)), + ("referer", _) => Some(TableIndex::HeaderName(51)), + ("refresh", _) => Some(TableIndex::HeaderName(52)), + ("retry-after", _) => Some(TableIndex::HeaderName(53)), + ("server", _) => Some(TableIndex::HeaderName(54)), + ("set-cookie", _) => Some(TableIndex::HeaderName(55)), + ("strict-transport-security", _) => Some(TableIndex::HeaderName(56)), + ("transfer-encoding", _) => Some(TableIndex::HeaderName(57)), + ("user-agent", _) => Some(TableIndex::HeaderName(58)), + ("vary", _) => Some(TableIndex::HeaderName(59)), + ("via", _) => Some(TableIndex::HeaderName(60)), + ("www-authenticate", _) => Some(TableIndex::HeaderName(61)), + _ => None, } } } diff --git a/ylong_http/src/request/uri/mod.rs b/ylong_http/src/request/uri/mod.rs index 874d795..6358715 100644 --- a/ylong_http/src/request/uri/mod.rs +++ b/ylong_http/src/request/uri/mod.rs @@ -645,13 +645,15 @@ impl Authority { return Err(InvalidUri::UriMissAuthority); } let (authority, rest) = authority_token(bytes)?; - if !rest.is_empty() || authority.is_none() { - return Err(InvalidUri::InvalidAuthority); + if rest.is_empty() { + if let Some(auth) = authority { + return Ok(auth); + } } - Ok(authority.unwrap()) + Err(InvalidUri::InvalidAuthority) } - /// Gets a immutable reference to `Host`. + /// Gets an immutable reference to `Host`. /// /// # Examples /// @@ -1046,6 +1048,16 @@ fn authority_token(bytes: &[u8]) -> Result<(Option, &[u8]), InvalidUr } } } + authority_parse(bytes, end, colon_num, left_bracket, right_bracket) +} + +fn authority_parse( + bytes: &[u8], + end: usize, + colon_num: i32, + left_bracket: bool, + right_bracket: bool, +) -> Result<(Option, &[u8]), InvalidUri> { // The authority does not exist. if end == 0 { return Ok((None, &bytes[end..])); diff --git a/ylong_http_client/examples/async_http.rs b/ylong_http_client/examples/async_http.rs index c8b17e1..f2c415f 100644 --- a/ylong_http_client/examples/async_http.rs +++ b/ylong_http_client/examples/async_http.rs @@ -33,7 +33,7 @@ async fn client_send() -> Result<(), HttpClientError> { // Creates a `Request`. let request = Request::builder() - .url("127.0.0.1:3000") + .url("https://sf3-cn.feishucdn.com/obj/ee-appcenter/47273f95/Feishu-win32_ia32-7.9.7-signed.exe") .body(Body::empty())?; // Sends request and receives a `Response`. diff --git a/ylong_http_client/src/async_impl/conn/http1.rs b/ylong_http_client/src/async_impl/conn/http1.rs index 97b1c29..7d8d01f 100644 --- a/ylong_http_client/src/async_impl/conn/http1.rs +++ b/ylong_http_client/src/async_impl/conn/http1.rs @@ -12,18 +12,21 @@ // limitations under the License. use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use ylong_http::body::async_impl::Body; use ylong_http::body::{ChunkBody, TextBody}; use ylong_http::h1::{RequestEncoder, ResponseDecoder}; use ylong_http::request::uri::Scheme; +use ylong_http::response::ResponsePart; use ylong_http::version::Version; use super::StreamData; use crate::async_impl::connector::ConnInfo; +use crate::async_impl::interceptor::Interceptors; use crate::async_impl::request::Message; -use crate::async_impl::{HttpBody, Response}; +use crate::async_impl::{HttpBody, Request, Response}; use crate::error::HttpClientError; use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use crate::util::dispatcher::http1::Http1Conn; @@ -46,33 +49,55 @@ where .intercept_request(message.request.ref_mut())?; let mut buf = vec![0u8; TEMP_BUF_SIZE]; - // Encodes and sends Request-line and Headers(non-body fields). - let mut part_encoder = RequestEncoder::new(message.request.ref_mut().part().clone()); - if conn.raw_mut().is_proxy() && message.request.ref_mut().uri().scheme() == Some(&Scheme::HTTP) - { - part_encoder.absolute_uri(true); - } - loop { - match part_encoder.encode(&mut buf[..]) { - Ok(0) => break, - Ok(written) => { - message.interceptor.intercept_input(&buf[..written])?; - // RequestEncoder writes `buf` as much as possible. - if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await { + encode_request_part( + message.request.ref_mut(), + &message.interceptor, + &mut conn, + &mut buf, + ) + .await?; + encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?; + + // Decodes response part. + let (part, pre) = { + let mut decoder = ResponseDecoder::new(); + loop { + let size = match conn.raw_mut().read(buf.as_mut_slice()).await { + Ok(0) => { + conn.shutdown(); + return err_from_msg!(Request, "Tcp closed"); + } + Ok(size) => size, + Err(e) => { conn.shutdown(); return err_from_io!(Request, e); } - } - Err(e) => { - conn.shutdown(); - return err_from_other!(Request, e); + }; + + message.interceptor.intercept_output(&buf[..size])?; + match decoder.decode(&buf[..size]) { + Ok(None) => {} + Ok(Some((part, rem))) => break (part, rem), + Err(e) => { + conn.shutdown(); + return err_from_other!(Request, e); + } } } - } + }; + + decode_response(message, part, conn, pre) +} - let content_length = message - .request - .ref_mut() +async fn encode_various_body( + request: &mut Request, + conn: &mut Http1Conn, + buf: &mut [u8], +) -> Result<(), HttpClientError> +where + S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static, +{ + let content_length = request .part() .headers .get("Content-Length") @@ -80,9 +105,7 @@ where .and_then(|v| v.parse::().ok()) .is_some(); - let transfer_encoding = message - .request - .ref_mut() + let transfer_encoding = request .part() .headers .get("Transfer-Encoding") @@ -90,51 +113,68 @@ where .map(|v| v.contains("chunked")) .unwrap_or(false); - let body = message.request.ref_mut().body_mut(); + let body = request.body_mut(); match (content_length, transfer_encoding) { (_, true) => { let body = ChunkBody::from_async_reader(body); - encode_body(&mut conn, body, &mut buf).await?; + encode_body(conn, body, buf).await?; } (true, false) => { let body = TextBody::from_async_reader(body); - encode_body(&mut conn, body, &mut buf).await?; + encode_body(conn, body, buf).await?; } (false, false) => { let body = TextBody::from_async_reader(body); - encode_body(&mut conn, body, &mut buf).await?; + encode_body(conn, body, buf).await?; } }; + Ok(()) +} - // Decodes response part. - let (part, pre) = { - let mut decoder = ResponseDecoder::new(); - loop { - let size = match conn.raw_mut().read(buf.as_mut_slice()).await { - Ok(0) => { - conn.shutdown(); - return err_from_msg!(Request, "Tcp closed"); - } - Ok(size) => size, - Err(e) => { +async fn encode_request_part( + request: &Request, + interceptor: &Arc, + conn: &mut Http1Conn, + buf: &mut [u8], +) -> Result<(), HttpClientError> +where + S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, +{ + // Encodes and sends Request-line and Headers(non-body fields). + let mut part_encoder = RequestEncoder::new(request.part().clone()); + if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) { + part_encoder.absolute_uri(true); + } + loop { + match part_encoder.encode(&mut buf[..]) { + Ok(0) => break, + Ok(written) => { + interceptor.intercept_input(&buf[..written])?; + // RequestEncoder writes `buf` as much as possible. + if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await { conn.shutdown(); return err_from_io!(Request, e); } - }; - - message.interceptor.intercept_output(&buf[..size])?; - match decoder.decode(&buf[..size]) { - Ok(None) => {} - Ok(Some((part, rem))) => break (part, rem), - Err(e) => { - conn.shutdown(); - return err_from_other!(Request, e); - } + } + Err(e) => { + conn.shutdown(); + return err_from_other!(Request, e); } } - }; + } + Ok(()) +} +fn decode_response( + mut message: Message, + part: ResponsePart, + conn: Http1Conn, + pre: &[u8], +) -> Result +where + S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static, +{ // The shutdown function only sets the current connection to the closed state // and does not release the connection immediately. // Instead, the connection will be completely closed diff --git a/ylong_http_client/src/async_impl/conn/http2.rs b/ylong_http_client/src/async_impl/conn/http2.rs index 165edd4..fc89271 100644 --- a/ylong_http_client/src/async_impl/conn/http2.rs +++ b/ylong_http_client/src/async_impl/conn/http2.rs @@ -260,12 +260,7 @@ where Some(Poll::Ready(Ok(()))) } else { buf.append_slice(&data[..fill_len]); - if frame.flags().is_end_stream() { - text_io.is_closed = true; - Some(Poll::Ready(Ok(()))) - } else { - None - } + Self::end_read(text_io, frame.flags().is_end_stream()) } } Payload::RstStream(reset) => { @@ -288,6 +283,15 @@ where } } + fn end_read(text_io: &mut TextIo, end_stream: bool) -> Option>> { + if end_stream { + text_io.is_closed = true; + Some(Poll::Ready(Ok(()))) + } else { + None + } + } + fn read_remaining_data( text_io: &mut TextIo, buf: &mut HttpReadBuf, diff --git a/ylong_http_client/src/async_impl/http_body.rs b/ylong_http_client/src/async_impl/http_body.rs index 82afdf6..a8fa85a 100644 --- a/ylong_http_client/src/async_impl/http_body.rs +++ b/ylong_http_client/src/async_impl/http_body.rs @@ -208,9 +208,7 @@ impl UntilClose { if buf.is_empty() { return Poll::Ready(Ok(0)); } - let mut read = 0; - if let Some(pre) = self.pre.as_mut() { // Here cursor read never failed. let this_read = Read::read(pre, buf).unwrap(); @@ -222,39 +220,50 @@ impl UntilClose { } if !buf[read..].is_empty() { - if let Some(mut io) = self.io.take() { - let mut read_buf = ReadBuf::new(&mut buf[read..]); - return match Pin::new(&mut io).poll_read(cx, &mut read_buf) { - // Disconnected. - Poll::Ready(Ok(())) => { - let filled = read_buf.filled().len(); - if filled == 0 { - io.shutdown(); - } else { - self.interceptors - .intercept_output(&buf[read..(read + filled)])?; - self.io = Some(io); - } - read += filled; - Poll::Ready(Ok(read)) - } - Poll::Pending => { - self.io = Some(io); - if read != 0 { - return Poll::Ready(Ok(read)); - } - Poll::Pending - } - Poll::Ready(Err(e)) => { - // If IO error occurs, shutdowns `io` before return. - io.shutdown(); - return Poll::Ready(err_from_io!(BodyTransfer, e)); - } - }; + if let Some(io) = self.io.take() { + return self.poll_read_io(cx, io, read, buf); } } Poll::Ready(Ok(read)) } + + fn poll_read_io( + &mut self, + cx: &mut Context<'_>, + mut io: BoxStreamData, + read: usize, + buf: &mut [u8], + ) -> Poll> { + let mut read = read; + let mut read_buf = ReadBuf::new(&mut buf[read..]); + match Pin::new(&mut io).poll_read(cx, &mut read_buf) { + // Disconnected. + Poll::Ready(Ok(())) => { + let filled = read_buf.filled().len(); + if filled == 0 { + io.shutdown(); + } else { + self.interceptors + .intercept_output(&buf[read..(read + filled)])?; + self.io = Some(io); + } + read += filled; + Poll::Ready(Ok(read)) + } + Poll::Pending => { + self.io = Some(io); + if read != 0 { + return Poll::Ready(Ok(read)); + } + Poll::Pending + } + Poll::Ready(Err(e)) => { + // If IO error occurs, shutdowns `io` before return. + io.shutdown(); + Poll::Ready(err_from_io!(BodyTransfer, e)) + } + } + } } struct Text { @@ -299,70 +308,90 @@ impl Text { self.pre = None; } else { read += this_read; - let (text, rem) = self.decoder.decode(&buf[..read]); - - // Contains redundant `rem`, return error. - match (text.is_complete(), rem.is_empty()) { - (true, false) => { - if let Some(io) = self.io.take() { - io.shutdown(); - }; - return Poll::Ready(err_from_msg!(BodyDecode, "Not eof")); - } - (true, true) => { - self.io = None; - return Poll::Ready(Ok(read)); - } - // TextBodyDecoder decodes as much as possible here. - _ => {} + if let Some(result) = self.read_remaining(buf, read) { + return result; } } } if !buf[read..].is_empty() { - if let Some(mut io) = self.io.take() { - let mut read_buf = ReadBuf::new(&mut buf[read..]); - match Pin::new(&mut io).poll_read(cx, &mut read_buf) { - // Disconnected. - Poll::Ready(Ok(())) => { - let filled = read_buf.filled().len(); - if filled == 0 { - io.shutdown(); - return Poll::Ready(err_from_msg!( - BodyDecode, - "Response body incomplete" - )); - } - let (text, rem) = self.decoder.decode(read_buf.filled()); - self.interceptors.intercept_output(read_buf.filled())?; - read += filled; - // Contains redundant `rem`, return error. - match (text.is_complete(), rem.is_empty()) { - (true, false) => { - io.shutdown(); - return Poll::Ready(err_from_msg!(BodyDecode, "Not eof")); - } - (true, true) => return Poll::Ready(Ok(read)), - _ => {} - } - self.io = Some(io); + if let Some(io) = self.io.take() { + return self.poll_read_io(cx, buf, io, read); + } + } + Poll::Ready(Ok(read)) + } + + fn read_remaining( + &mut self, + buf: &mut [u8], + read: usize, + ) -> Option>> { + let (text, rem) = self.decoder.decode(&buf[..read]); + + // Contains redundant `rem`, return error. + match (text.is_complete(), rem.is_empty()) { + (true, false) => { + if let Some(io) = self.io.take() { + io.shutdown(); + }; + Some(Poll::Ready(err_from_msg!(BodyDecode, "Not eof"))) + } + (true, true) => { + self.io = None; + Some(Poll::Ready(Ok(read))) + } + // TextBodyDecoder decodes as much as possible here. + _ => None, + } + } + + fn poll_read_io( + &mut self, + cx: &mut Context<'_>, + buf: &mut [u8], + mut io: BoxStreamData, + read: usize, + ) -> Poll> { + let mut read = read; + let mut read_buf = ReadBuf::new(&mut buf[read..]); + match Pin::new(&mut io).poll_read(cx, &mut read_buf) { + // Disconnected. + Poll::Ready(Ok(())) => { + let filled = read_buf.filled().len(); + if filled == 0 { + io.shutdown(); + return Poll::Ready(err_from_msg!(BodyDecode, "Response body incomplete")); + } + let (text, rem) = self.decoder.decode(read_buf.filled()); + self.interceptors.intercept_output(read_buf.filled())?; + read += filled; + // Contains redundant `rem`, return error. + match (text.is_complete(), rem.is_empty()) { + (true, false) => { + io.shutdown(); + Poll::Ready(err_from_msg!(BodyDecode, "Not eof")) } - Poll::Pending => { + (true, true) => Poll::Ready(Ok(read)), + _ => { self.io = Some(io); - if read != 0 { - return Poll::Ready(Ok(read)); - } - return Poll::Pending; - } - Poll::Ready(Err(e)) => { - // If IO error occurs, shutdowns `io` before return. - io.shutdown(); - return Poll::Ready(err_from_io!(BodyDecode, e)); + Poll::Ready(Ok(read)) } } } + Poll::Pending => { + self.io = Some(io); + if read != 0 { + return Poll::Ready(Ok(read)); + } + Poll::Pending + } + Poll::Ready(Err(e)) => { + // If IO error occurs, shutdowns `io` before return. + io.shutdown(); + Poll::Ready(err_from_io!(BodyDecode, e)) + } } - Poll::Ready(Ok(read)) } } diff --git a/ylong_http_client/src/util/c_openssl/bio.rs b/ylong_http_client/src/util/c_openssl/bio.rs index 7565af9..a24dbd2 100644 --- a/ylong_http_client/src/util/c_openssl/bio.rs +++ b/ylong_http_client/src/util/c_openssl/bio.rs @@ -229,6 +229,25 @@ unsafe extern "C" fn destroy(bio: *mut BIO) -> c_int { 1 } +macro_rules! catch_unwind_bio { + ($io: expr, $flag: expr, $bio: expr, $state: expr) => { + match catch_unwind(AssertUnwindSafe(|| $io)) { + Ok(Err(err)) => { + if retry_error(&err) { + BIO_set_flags($bio, BIO_FLAGS_SHOULD_RETRY | $flag) + } + $state.error = Some(err); + -1 + } + Ok(Ok(len)) => len as c_int, + Err(err) => { + $state.panic = Some(err); + -1 + } + } + }; +} + unsafe extern "C" fn bwrite(bio: *mut BIO, buf: *const c_char, len: c_int) -> c_int { BIO_clear_flags(bio, BIO_FLAGS_SHOULD_RETRY | BIO_FLAGS_RWS); @@ -239,21 +258,7 @@ unsafe extern "C" fn bwrite(bio: *mut BIO, buf: *const c_char, len: c_ } let buf = slice::from_raw_parts(buf as *const _, len as usize); - - match catch_unwind(AssertUnwindSafe(|| state.stream.write(buf))) { - Ok(Err(err)) => { - if retry_error(&err) { - BIO_set_flags(bio, BIO_FLAGS_SHOULD_RETRY | BIO_FLAGS_WRITE) - } - state.error = Some(err); - -1 - } - Ok(Ok(len)) => len as c_int, - Err(err) => { - state.panic = Some(err); - -1 - } - } + catch_unwind_bio!(state.stream.write(buf), BIO_FLAGS_WRITE, bio, state) } unsafe extern "C" fn bread(bio: *mut BIO, buf: *mut c_char, len: c_int) -> c_int { @@ -262,20 +267,7 @@ unsafe extern "C" fn bread(bio: *mut BIO, buf: *mut c_char, len: c_int) let state = get_state::(bio); let buf = slice::from_raw_parts_mut(buf as *mut _, len as usize); - match catch_unwind(AssertUnwindSafe(|| state.stream.read(buf))) { - Ok(Err(err)) => { - if retry_error(&err) { - BIO_set_flags(bio, BIO_FLAGS_SHOULD_RETRY | BIO_FLAGS_READ) - } - state.error = Some(err); - -1 - } - Ok(Ok(len)) => len as c_int, - Err(err) => { - state.panic = Some(err); - -1 - } - } + catch_unwind_bio!(state.stream.read(buf), BIO_FLAGS_READ, bio, state) } unsafe extern "C" fn bputs(bio: *mut BIO, buf: *const c_char) -> c_int { diff --git a/ylong_http_client/src/util/c_openssl/ssl/ctx.rs b/ylong_http_client/src/util/c_openssl/ssl/ctx.rs index dd8d36c..13fbd4c 100644 --- a/ylong_http_client/src/util/c_openssl/ssl/ctx.rs +++ b/ylong_http_client/src/util/c_openssl/ssl/ctx.rs @@ -148,16 +148,8 @@ impl SslContextBuilder { where P: AsRef, { - let path = match file.as_ref().as_os_str().to_str() { - Some(path) => path, - None => return Err(ErrorStack::get()), - }; - let file = match CString::new(path) { - Ok(path) => path, - Err(_) => return Err(ErrorStack::get()), - }; + let file = Self::get_c_file(file)?; let ptr = self.as_ptr_mut(); - check_ret(unsafe { SSL_CTX_load_verify_locations(ptr, file.as_ptr() as *const _, ptr::null()) }) @@ -200,16 +192,8 @@ impl SslContextBuilder { where P: AsRef, { - let path = match file.as_ref().as_os_str().to_str() { - Some(path) => path, - None => return Err(ErrorStack::get()), - }; - let file = match CString::new(path) { - Ok(path) => path, - Err(_) => return Err(ErrorStack::get()), - }; + let file = Self::get_c_file(file)?; let ptr = self.as_ptr_mut(); - check_ret(unsafe { SSL_CTX_use_certificate_file(ptr, file.as_ptr() as *const _, file_type.as_raw()) }) @@ -222,6 +206,16 @@ impl SslContextBuilder { /// followed by intermediate CA certificates if applicable, and ending /// at the highest level (root) CA. pub(crate) fn set_certificate_chain_file

(&mut self, file: P) -> Result<(), ErrorStack> + where + P: AsRef, + { + let file = Self::get_c_file(file)?; + let ptr = self.as_ptr_mut(); + check_ret(unsafe { SSL_CTX_use_certificate_chain_file(ptr, file.as_ptr() as *const _) }) + .map(|_| ()) + } + + pub(crate) fn get_c_file

(file: P) -> Result where P: AsRef, { @@ -229,14 +223,10 @@ impl SslContextBuilder { Some(path) => path, None => return Err(ErrorStack::get()), }; - let file = match CString::new(path) { - Ok(path) => path, - Err(_) => return Err(ErrorStack::get()), - }; - let ptr = self.as_ptr_mut(); - - check_ret(unsafe { SSL_CTX_use_certificate_chain_file(ptr, file.as_ptr() as *const _) }) - .map(|_| ()) + match CString::new(path) { + Ok(path) => Ok(path), + Err(_) => Err(ErrorStack::get()), + } } /// Sets the protocols to sent to the server for Application Layer Protocol diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index 698d27b..94fcd55 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -279,70 +279,21 @@ pub(crate) mod http2 { flow.setup_recv_window(config.conn_window_size()); let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow); - - let encoder = FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); - let decoder = FrameDecoder::new(); - - let (read, write) = crate::runtime::split(io); - let shutdown_flag = Arc::new(AtomicBool::new(false)); - let settings_sync = Arc::new(Mutex::new(SettingsSync::default())); - let controller = StreamController::new(streams, shutdown_flag.clone()); // The id of the client stream, starting from 1 let next_stream_id = StreamId { id: AtomicU32::new(1), }; - let (input_tx, input_rx) = unbounded_channel(); - let (resp_tx, resp_rx) = unbounded_channel(); let (req_tx, req_rx) = unbounded_channel(); - match input_tx.send(settings) { - Ok(_) => { - let send_settings_sync = settings_sync.clone(); - let _send = crate::runtime::spawn(async move { - let mut writer = write; - match async_send_preface(&mut writer).await { - Ok(_) => { - let mut send = - SendData::new(encoder, send_settings_sync, writer, input_rx); - match Pin::new(&mut send).await { - Ok(_) => {} - Err(_e) => {} - } - } - Err(_e) => {} - } - }); - - let recv_settings_sync = settings_sync.clone(); - let _recv = crate::runtime::spawn(async move { - let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx); - match Pin::new(&mut recv).await { - Ok(_) => {} - Err(_e) => {} - } - }); - - let _manager = crate::runtime::spawn(async move { - let mut conn_manager = - ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller); - match Pin::new(&mut conn_manager).await { - Ok(_) => {} - Err(e) => { - conn_manager.exit_with_error(e); - } - } - }); - } - Err(_e) => { - // Error is not possible, so it is not handled for the time - // being. - } + // Error is not possible, so it is not handled for the time + // being. + if input_tx.send(settings).is_ok() { + Self::launch(controller, req_rx, input_tx, input_rx, io); } - Self { next_stream_id, sender: req_tx, @@ -350,6 +301,43 @@ pub(crate) mod http2 { _mark: PhantomData, } } + + fn launch( + controller: StreamController, + req_rx: UnboundedReceiver, + input_tx: UnboundedSender, + input_rx: UnboundedReceiver, + io: S, + ) { + let (resp_tx, resp_rx) = unbounded_channel(); + let (read, write) = crate::runtime::split(io); + let settings_sync = Arc::new(Mutex::new(SettingsSync::default())); + let send_settings_sync = settings_sync.clone(); + let _send = crate::runtime::spawn(async move { + let mut writer = write; + if async_send_preface(&mut writer).await.is_ok() { + let encoder = + FrameEncoder::new(DEFAULT_MAX_FRAME_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); + let mut send = SendData::new(encoder, send_settings_sync, writer, input_rx); + let _ = Pin::new(&mut send).await; + } + }); + + let recv_settings_sync = settings_sync.clone(); + let _recv = crate::runtime::spawn(async move { + let decoder = FrameDecoder::new(); + let mut recv = RecvData::new(decoder, recv_settings_sync, read, resp_tx); + let _ = Pin::new(&mut recv).await; + }); + + let _manager = crate::runtime::spawn(async move { + let mut conn_manager = + ConnManager::new(settings_sync, input_tx, resp_rx, req_rx, controller); + if let Err(e) = Pin::new(&mut conn_manager).await { + conn_manager.exit_with_error(e); + } + }); + } } impl Dispatcher for Http2Dispatcher { diff --git a/ylong_http_client/src/util/proxy.rs b/ylong_http_client/src/util/proxy.rs index 09d8b28..c45942f 100644 --- a/ylong_http_client/src/util/proxy.rs +++ b/ylong_http_client/src/util/proxy.rs @@ -227,13 +227,9 @@ impl NoProxy { } fn contains_ip(&self, ip: IpAddr) -> bool { - for block_ip in self.ips.iter() { - match block_ip { - Ip::Address(i) => { - if &ip == i { - return true; - } - } + for Ip::Address(i) in self.ips.iter() { + if &ip == i { + return true; } } false -- Gitee