diff --git a/ylong_http_client/src/async_impl/conn/http1.rs b/ylong_http_client/src/async_impl/conn/http1.rs index 8fb44703545086b740b1faf778b8be4e9fb01d97..40e115c97f2f25cac437475411e9dc544b45c483 100644 --- a/ylong_http_client/src/async_impl/conn/http1.rs +++ b/ylong_http_client/src/async_impl/conn/http1.rs @@ -56,6 +56,7 @@ where .ref_mut() .time_group_mut() .set_transfer_start(Instant::now()); + conn.running(true); encode_request_part( message.request.ref_mut(), &message.interceptor, @@ -101,6 +102,7 @@ where } } }; + conn.running(false); decode_response(message, part, conn, pre) } diff --git a/ylong_http_client/src/async_impl/pool.rs b/ylong_http_client/src/async_impl/pool.rs index a8a5a7fbb6bccaa10241f7a8fa06d979e4369c0c..ded7cffac0cf31ef51554170cf735651001386c0 100644 --- a/ylong_http_client/src/async_impl/pool.rs +++ b/ylong_http_client/src/async_impl/pool.rs @@ -448,7 +448,8 @@ impl Conns // TODO Distinguish between http2 connections and http1 connections. for dispatcher in curr.into_iter() { // Discard invalid dispatchers. - if dispatcher.is_shutdown() { + // Running dispatchers means tcp canceled while read and write. + if dispatcher.is_shutdown() || dispatcher.is_running() { continue; } if conn.is_none() { diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index cee41efe5bffaafc9a215f7b2cdfad9428f3cee4..4e8f5dc0d8cbfa8a2ac2efb57bdc187ce3558a5c 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -23,6 +23,8 @@ pub(crate) trait Dispatcher { #[allow(dead_code)] fn is_goaway(&self) -> bool; + + fn is_running(&self) -> bool; } pub(crate) enum ConnDispatcher { @@ -77,6 +79,19 @@ impl Dispatcher for ConnDispatcher { Self::Http3(h3) => h3.is_goaway(), } } + + fn is_running(&self) -> bool { + match self { + #[cfg(feature = "http1_1")] + Self::Http1(h1) => h1.is_running(), + + #[cfg(feature = "http2")] + Self::Http2(h2) => h2.is_running(), + + #[cfg(feature = "http3")] + Self::Http3(h3) => h3.is_running(), + } + } } pub(crate) enum Conn { @@ -158,6 +173,8 @@ pub(crate) mod http1 { pub(crate) occupied: AtomicBool, // `shutdown` indicates that the connection need to be shut down. pub(crate) shutdown: AtomicBool, + // `running` indicates that the connection is cancelled during use. + pub(crate) running: AtomicBool, } unsafe impl Sync for Inner {} @@ -169,6 +186,7 @@ pub(crate) mod http1 { io: UnsafeCell::new(io), occupied: AtomicBool::new(false), shutdown: AtomicBool::new(false), + running: AtomicBool::new(false), }), } } @@ -192,6 +210,10 @@ pub(crate) mod http1 { fn is_goaway(&self) -> bool { false } + + fn is_running(&self) -> bool { + self.inner.running.load(Ordering::Relaxed) + } } /// Handle returned to other threads for I/O operations. @@ -223,6 +245,10 @@ pub(crate) mod http1 { pub(crate) fn shutdown(&self) { self.inner.shutdown.store(true, Ordering::Release); } + + pub(crate) fn running(&self, is_run:bool) { + self.inner.running.store(is_run, Ordering::Release); + } } impl Drop for Http1Conn { @@ -526,6 +552,10 @@ pub(crate) mod http2 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } + + fn is_running(&self) -> bool { + todo!() + } } impl Drop for Http2Dispatcher { @@ -1021,6 +1051,10 @@ pub(crate) mod http3 { fn is_goaway(&self) -> bool { self.io_goaway.load(Ordering::Relaxed) } + + fn is_running(&self) -> bool { + todo!() + } } impl Drop for Http3Dispatcher {