From d763895a67f008a0672ea39dab0c4861b9e1de8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E6=9C=AA=E6=9D=A5?= Date: Mon, 24 Mar 2025 20:34:58 +0800 Subject: [PATCH] safe cancel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 徐未来 --- .../src/async_impl/conn/http1.rs | 2 ++ ylong_http_client/src/async_impl/pool.rs | 3 +- ylong_http_client/src/util/dispatcher.rs | 34 +++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/ylong_http_client/src/async_impl/conn/http1.rs b/ylong_http_client/src/async_impl/conn/http1.rs index 0c66960..687e959 100644 --- a/ylong_http_client/src/async_impl/conn/http1.rs +++ b/ylong_http_client/src/async_impl/conn/http1.rs @@ -53,6 +53,7 @@ where .ref_mut() .time_group_mut() .set_transfer_start(Instant::now()); + conn.running(true); encode_request_part( message.request.ref_mut(), &message.interceptor, @@ -103,6 +104,7 @@ where } } }; + conn.running(false); decode_response(message, part, conn, pre) } diff --git a/ylong_http_client/src/async_impl/pool.rs b/ylong_http_client/src/async_impl/pool.rs index 1106c89..867af28 100644 --- a/ylong_http_client/src/async_impl/pool.rs +++ b/ylong_http_client/src/async_impl/pool.rs @@ -429,7 +429,8 @@ impl Conns // TODO Distinguish between http2 connections and http1 connections. for dispatcher in curr.into_iter() { // Discard invalid dispatchers. - if dispatcher.is_shutdown() { + // Running dispatchers means tcp canceled while read and write. + if dispatcher.is_shutdown() || dispatcher.is_running() { continue; } if conn.is_none() { diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index c675e95..0cee57b 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -23,6 +23,8 @@ pub(crate) trait Dispatcher { #[allow(dead_code)] fn is_goaway(&self) -> bool; + + fn is_running(&self) -> bool; } pub(crate) enum ConnDispatcher { @@ -77,6 +79,19 @@ impl Dispatcher for ConnDispatcher { Self::Http3(h3) => h3.is_goaway(), } } + + fn is_running(&self) -> bool { + match self { + #[cfg(feature = "http1_1")] + Self::Http1(h1) => h1.is_running(), + + #[cfg(feature = "http2")] + Self::Http2(h2) => h2.is_running(), + + #[cfg(feature = "http3")] + Self::Http3(h3) => h3.is_running(), + } + } } pub(crate) enum Conn { @@ -157,6 +172,8 @@ pub(crate) mod http1 { pub(crate) occupied: AtomicBool, // `shutdown` indicates that the connection need to be shut down. pub(crate) shutdown: AtomicBool, + // `running` indicates that the connection is cancelled during use. + pub(crate) running: AtomicBool, } unsafe impl Sync for Inner {} @@ -168,6 +185,7 @@ pub(crate) mod http1 { io: UnsafeCell::new(io), occupied: AtomicBool::new(false), shutdown: AtomicBool::new(false), + running: AtomicBool::new(false), }), } } @@ -191,6 +209,10 @@ pub(crate) mod http1 { fn is_goaway(&self) -> bool { false } + + fn is_running(&self) -> bool { + self.inner.running.load(Ordering::Relaxed) + } } /// Handle returned to other threads for I/O operations. @@ -217,6 +239,10 @@ pub(crate) mod http1 { pub(crate) fn shutdown(&self) { self.inner.shutdown.store(true, Ordering::Release); } + + pub(crate) fn running(&self, is_run:bool) { + self.inner.running.store(is_run, Ordering::Release); + } } impl Drop for Http1Conn { @@ -517,6 +543,10 @@ pub(crate) mod http2 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } + + fn is_running(&self) -> bool { + todo!() + } } impl Drop for Http2Dispatcher { @@ -1008,6 +1038,10 @@ pub(crate) mod http3 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } + + fn is_running(&self) -> bool { + todo!() + } } impl Drop for Http3Dispatcher { -- Gitee