diff --git a/ylong_http_client/src/async_impl/conn/http1.rs b/ylong_http_client/src/async_impl/conn/http1.rs index 40e115c97f2f25cac437475411e9dc544b45c483..b6791a9e834f21a7fff77f8ed43667d2ed0a2e47 100644 --- a/ylong_http_client/src/async_impl/conn/http1.rs +++ b/ylong_http_client/src/async_impl/conn/http1.rs @@ -56,7 +56,7 @@ where .ref_mut() .time_group_mut() .set_transfer_start(Instant::now()); - conn.running(true); + let mut guard = conn.cancel_guard(); encode_request_part( message.request.ref_mut(), &message.interceptor, @@ -102,7 +102,9 @@ where } } }; - conn.running(false); + guard.normal_end(); + // if task cancel occurs, we should shutdown io + drop(guard); decode_response(message, part, conn, pre) } diff --git a/ylong_http_client/src/async_impl/pool.rs b/ylong_http_client/src/async_impl/pool.rs index ded7cffac0cf31ef51554170cf735651001386c0..427aee1c1122424f9d87c6f1f11073f753793905 100644 --- a/ylong_http_client/src/async_impl/pool.rs +++ b/ylong_http_client/src/async_impl/pool.rs @@ -306,16 +306,7 @@ impl Conns let mut data = stream.conn_data(); let time_group = take(data.time_group_mut()); - let protocol = if let Some(bytes) = data.negotiate().alpn() { - bytes - } else { - let dispatcher = ConnDispatcher::http1(stream); - return Ok(TimeInfoConn::new( - self.dispatch_h1_conn(dispatcher, permit), - time_group, - )); - }; - + let protocol = data.negotiate().alpn().unwrap_or(b"http/1.1"); if protocol == b"http/1.1" { let dispatcher = ConnDispatcher::http1(stream); Ok(TimeInfoConn::new( @@ -448,8 +439,7 @@ impl Conns // TODO Distinguish between http2 connections and http1 connections. for dispatcher in curr.into_iter() { // Discard invalid dispatchers. - // Running dispatchers means tcp canceled while read and write. - if dispatcher.is_shutdown() || dispatcher.is_running() { + if dispatcher.is_shutdown() { continue; } if conn.is_none() { diff --git a/ylong_http_client/src/async_impl/timeout.rs b/ylong_http_client/src/async_impl/timeout.rs index 3f3ba11f24c44634346b4555735fbd559e89d383..c3af42893572b2389067fca2ac881e46dddf8e18 100644 --- a/ylong_http_client/src/async_impl/timeout.rs +++ b/ylong_http_client/src/async_impl/timeout.rs @@ -69,19 +69,19 @@ where #[cfg(all(test, feature = "ylong_base"))] mod ut_timeout { + use std::pin::Pin; use std::sync::Arc; + use ylong_http::response::status::StatusCode; use ylong_http::response::{Response, ResponsePart}; use ylong_http::version::Version; use crate::async_impl::timeout::TimeoutFuture; use crate::async_impl::HttpBody; + use crate::runtime::Sleep; use crate::util::interceptor::IdleInterceptor; use crate::util::normalizer::BodyLength; use crate::HttpClientError; - use std::pin::Pin; - - use crate::runtime::Sleep; /// UT test cases for `TimeoutFuture`. /// diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index 4e8f5dc0d8cbfa8a2ac2efb57bdc187ce3558a5c..e1d0ec51886fe6dc165d04e8c5b1e0222660323d 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -23,8 +23,6 @@ pub(crate) trait Dispatcher { #[allow(dead_code)] fn is_goaway(&self) -> bool; - - fn is_running(&self) -> bool; } pub(crate) enum ConnDispatcher { @@ -79,19 +77,6 @@ impl Dispatcher for ConnDispatcher { Self::Http3(h3) => h3.is_goaway(), } } - - fn is_running(&self) -> bool { - match self { - #[cfg(feature = "http1_1")] - Self::Http1(h1) => h1.is_running(), - - #[cfg(feature = "http2")] - Self::Http2(h2) => h2.is_running(), - - #[cfg(feature = "http3")] - Self::Http3(h3) => h3.is_running(), - } - } } pub(crate) enum Conn { @@ -173,8 +158,6 @@ pub(crate) mod http1 { pub(crate) occupied: AtomicBool, // `shutdown` indicates that the connection need to be shut down. pub(crate) shutdown: AtomicBool, - // `running` indicates that the connection is cancelled during use. - pub(crate) running: AtomicBool, } unsafe impl Sync for Inner {} @@ -186,7 +169,6 @@ pub(crate) mod http1 { io: UnsafeCell::new(io), occupied: AtomicBool::new(false), shutdown: AtomicBool::new(false), - running: AtomicBool::new(false), }), } } @@ -210,10 +192,6 @@ pub(crate) mod http1 { fn is_goaway(&self) -> bool { false } - - fn is_running(&self) -> bool { - self.inner.running.load(Ordering::Relaxed) - } } /// Handle returned to other threads for I/O operations. @@ -246,8 +224,11 @@ pub(crate) mod http1 { self.inner.shutdown.store(true, Ordering::Release); } - pub(crate) fn running(&self, is_run:bool) { - self.inner.running.store(is_run, Ordering::Release); + pub(crate) fn cancel_guard(&self) -> CancelGuard { + CancelGuard { + inner: self.inner.clone(), + running: true, + } } } @@ -257,6 +238,29 @@ pub(crate) mod http1 { } } + /// Http1 cancel guard + pub(crate) struct CancelGuard { + inner: Arc>, + /// Default true + running: bool, + } + + impl CancelGuard { + pub(crate) fn normal_end(&mut self) { + self.running = false + } + } + + impl Drop for CancelGuard { + fn drop(&mut self) { + // When a drop occurs, if running is still true, it means a cancel has occurred, + // and the IO needs to be shutdown to prevent the reuse of dirty data + if self.running { + self.inner.shutdown.store(true, Ordering::Release); + } + } + } + pub(crate) struct WrappedSemaphore { sem: Arc, } @@ -552,10 +556,6 @@ pub(crate) mod http2 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } - - fn is_running(&self) -> bool { - todo!() - } } impl Drop for Http2Dispatcher { @@ -1051,10 +1051,6 @@ pub(crate) mod http3 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } - - fn is_running(&self) -> bool { - todo!() - } } impl Drop for Http3Dispatcher {