From 921666d54904946da36fb241d9cf5d278839c214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E6=9C=AA=E6=9D=A5?= Date: Mon, 19 May 2025 10:33:43 +0800 Subject: [PATCH] safe cancel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 徐未来 --- .../src/async_impl/conn/http1.rs | 2 ++ ylong_http_client/src/async_impl/pool.rs | 3 +- ylong_http_client/src/util/dispatcher.rs | 34 +++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/ylong_http_client/src/async_impl/conn/http1.rs b/ylong_http_client/src/async_impl/conn/http1.rs index 8fb4470..40e115c 100644 --- a/ylong_http_client/src/async_impl/conn/http1.rs +++ b/ylong_http_client/src/async_impl/conn/http1.rs @@ -56,6 +56,7 @@ where .ref_mut() .time_group_mut() .set_transfer_start(Instant::now()); + conn.running(true); encode_request_part( message.request.ref_mut(), &message.interceptor, @@ -101,6 +102,7 @@ where } } }; + conn.running(false); decode_response(message, part, conn, pre) } diff --git a/ylong_http_client/src/async_impl/pool.rs b/ylong_http_client/src/async_impl/pool.rs index a8a5a7f..ded7cff 100644 --- a/ylong_http_client/src/async_impl/pool.rs +++ b/ylong_http_client/src/async_impl/pool.rs @@ -448,7 +448,8 @@ impl Conns // TODO Distinguish between http2 connections and http1 connections. for dispatcher in curr.into_iter() { // Discard invalid dispatchers. - if dispatcher.is_shutdown() { + // Running dispatchers means tcp canceled while read and write. + if dispatcher.is_shutdown() || dispatcher.is_running() { continue; } if conn.is_none() { diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index cee41ef..4e8f5dc 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -23,6 +23,8 @@ pub(crate) trait Dispatcher { #[allow(dead_code)] fn is_goaway(&self) -> bool; + + fn is_running(&self) -> bool; } pub(crate) enum ConnDispatcher { @@ -77,6 +79,19 @@ impl Dispatcher for ConnDispatcher { Self::Http3(h3) => h3.is_goaway(), } } + + fn is_running(&self) -> bool { + match self { + #[cfg(feature = "http1_1")] + Self::Http1(h1) => h1.is_running(), + + #[cfg(feature = "http2")] + Self::Http2(h2) => h2.is_running(), + + #[cfg(feature = "http3")] + Self::Http3(h3) => h3.is_running(), + } + } } pub(crate) enum Conn { @@ -158,6 +173,8 @@ pub(crate) mod http1 { pub(crate) occupied: AtomicBool, // `shutdown` indicates that the connection need to be shut down. pub(crate) shutdown: AtomicBool, + // `running` indicates that the connection is cancelled during use. + pub(crate) running: AtomicBool, } unsafe impl Sync for Inner {} @@ -169,6 +186,7 @@ pub(crate) mod http1 { io: UnsafeCell::new(io), occupied: AtomicBool::new(false), shutdown: AtomicBool::new(false), + running: AtomicBool::new(false), }), } } @@ -192,6 +210,10 @@ pub(crate) mod http1 { fn is_goaway(&self) -> bool { false } + + fn is_running(&self) -> bool { + self.inner.running.load(Ordering::Relaxed) + } } /// Handle returned to other threads for I/O operations. @@ -223,6 +245,10 @@ pub(crate) mod http1 { pub(crate) fn shutdown(&self) { self.inner.shutdown.store(true, Ordering::Release); } + + pub(crate) fn running(&self, is_run:bool) { + self.inner.running.store(is_run, Ordering::Release); + } } impl Drop for Http1Conn { @@ -526,6 +552,10 @@ pub(crate) mod http2 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } + + fn is_running(&self) -> bool { + todo!() + } } impl Drop for Http2Dispatcher { @@ -1021,6 +1051,10 @@ pub(crate) mod http3 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } + + fn is_running(&self) -> bool { + todo!() + } } impl Drop for Http3Dispatcher { -- Gitee