diff --git a/ylong_http_client/src/async_impl/pool.rs b/ylong_http_client/src/async_impl/pool.rs index 785931d61eb59f66de276ea8f2f1add4e8ceddd8..c84cfda356cd21cf6e647063f10140a1a3245516 100644 --- a/ylong_http_client/src/async_impl/pool.rs +++ b/ylong_http_client/src/async_impl/pool.rs @@ -419,13 +419,16 @@ impl Conns lock: &mut crate::runtime::MutexGuard>>, ) -> Option> { if let Some(dispatcher) = lock.pop() { - // todo: shutdown and goaway - if !dispatcher.is_shutdown() { + if dispatcher.is_shutdown() { + return None; + } + if !dispatcher.is_goaway() { if let Some(conn) = dispatcher.dispatch() { lock.push(dispatcher); return Some(conn); } } + lock.push(dispatcher); } None } diff --git a/ylong_http_client/src/util/dispatcher.rs b/ylong_http_client/src/util/dispatcher.rs index 8e37775c421e2e64714b59082a2308f0238ca2ae..5942f8eee0c3c3a65d3e4cfac89848ad4b2e8785 100644 --- a/ylong_http_client/src/util/dispatcher.rs +++ b/ylong_http_client/src/util/dispatcher.rs @@ -280,6 +280,7 @@ pub(crate) mod http2 { pub(crate) allowed_cache: usize, pub(crate) sender: UnboundedSender, pub(crate) io_shutdown: Arc, + pub(crate) io_goaway: Arc, pub(crate) handles: Vec>, pub(crate) _mark: PhantomData, } @@ -298,13 +299,14 @@ pub(crate) mod http2 { // The connection close flag organizes new stream commits to the current connection when // closed. pub(crate) io_shutdown: Arc, + pub(crate) io_goaway: Arc, // The senders of all connected stream channels of response. pub(crate) senders: HashMap>, pub(crate) curr_message: HashMap, // Stream information on the connection. pub(crate) streams: Streams, // Received GO_AWAY frame. - pub(crate) recved_go_away: Option, + pub(crate) go_away_error_code: Option, // The last GO_AWAY frame sent by the client. pub(crate) go_away_sync: GoAwaySync, } @@ -350,7 +352,9 @@ pub(crate) mod http2 { let streams = Streams::new(config.stream_window_size(), DEFAULT_WINDOW_SIZE, flow); let shutdown_flag = Arc::new(AtomicBool::new(false)); - let mut controller = StreamController::new(streams, shutdown_flag.clone()); + let goaway_flag = Arc::new(AtomicBool::new(false)); + let mut controller = + StreamController::new(streams, shutdown_flag.clone(), goaway_flag.clone()); let (input_tx, input_rx) = unbounded_channel(); let (req_tx, req_rx) = unbounded_channel(); @@ -382,6 +386,7 @@ pub(crate) mod http2 { allowed_cache: config.allowed_cache_frame_size(), sender: req_tx, io_shutdown: shutdown_flag, + io_goaway: goaway_flag, handles, _mark: PhantomData, } @@ -447,8 +452,7 @@ pub(crate) mod http2 { } fn is_goaway(&self) -> bool { - // todo: goaway and shutdown - false + self.io_goaway.load(Ordering::Relaxed) } } @@ -498,13 +502,18 @@ pub(crate) mod http2 { } impl StreamController { - pub(crate) fn new(streams: Streams, shutdown: Arc) -> Self { + pub(crate) fn new( + streams: Streams, + shutdown: Arc, + goaway: Arc, + ) -> Self { Self { io_shutdown: shutdown, + io_goaway: goaway, senders: HashMap::new(), curr_message: HashMap::new(), streams, - recved_go_away: None, + go_away_error_code: None, go_away_sync: GoAwaySync::default(), } } @@ -513,6 +522,10 @@ pub(crate) mod http2 { self.io_shutdown.store(true, Ordering::Release); } + pub(crate) fn goaway(&self) { + self.io_goaway.store(true, Ordering::Release); + } + pub(crate) fn get_unsent_streams( &mut self, last_stream_id: StreamId, @@ -523,7 +536,7 @@ pub(crate) mod http2 { return Err(H2Error::ConnectionError(ErrorCode::ProtocolError)); } self.streams.max_send_id = last_stream_id; - Ok(self.streams.get_go_away_streams(last_stream_id)) + Ok(self.streams.get_unset_streams(last_stream_id)) } pub(crate) fn send_message_to_stream( diff --git a/ylong_http_client/src/util/h2/manager.rs b/ylong_http_client/src/util/h2/manager.rs index 311b4750fc36fefb134db272e88dc1da32d58f81..0f12f499e2f26cf4f70078a8e63c1733762135dc 100644 --- a/ylong_http_client/src/util/h2/manager.rs +++ b/ylong_http_client/src/util/h2/manager.rs @@ -74,6 +74,10 @@ impl Future for ConnManager { } // io output occurs error. OutputMessage::OutputExit(e) => { + // Ever received a goaway frame + if let Some(_) = manager.controller.go_away_error_code { + continue; + } // Note error returned immediately. if manager.manage_resp_error(cx, e)?.is_pending() { return Poll::Pending; @@ -139,8 +143,9 @@ impl ConnManager { cx: &mut Context<'_>, ) -> Poll> { // The manager previously accepted a GOAWAY Frame. - if let Some(code) = self.controller.recved_go_away { - self.poll_deal_with_go_away(code)?; + if let Some(error_code) = self.controller.go_away_error_code { + self.poll_deal_with_go_away(error_code)?; + return Poll::Pending; } self.poll_recv_request(cx)?; self.poll_input_request(cx)?; @@ -389,7 +394,7 @@ impl ConnManager { return Poll::Ready(Ok(())); }; // Prevents the current connection from generating a new stream. - self.controller.shutdown(); + self.controller.goaway(); self.req_rx.close(); let last_stream_id = go_away.get_last_stream_id(); let streams = self.controller.get_unsent_streams(last_stream_id)?; @@ -411,7 +416,7 @@ impl ConnManager { } } // Exit after the allowed stream is complete. - self.controller.recved_go_away = Some(go_away.get_error_code()); + self.controller.go_away_error_code = Some(go_away.get_error_code()); if blocked { Poll::Pending } else { @@ -630,6 +635,8 @@ impl ConnManager { ); self.send_peer_goaway(frame, go_away_payload, error_code)?; + // close connection + self.controller.shutdown(); return Err(H2Error::ConnectionError(ErrorCode::try_from(error_code)?).into()); } Ok(()) diff --git a/ylong_http_client/src/util/h2/output.rs b/ylong_http_client/src/util/h2/output.rs index 366ae2fbcb78fef5f938ba3782070817610b68e2..34b2bc91315c32afca4a841f7d5cc1040bc3f3b1 100644 --- a/ylong_http_client/src/util/h2/output.rs +++ b/ylong_http_client/src/util/h2/output.rs @@ -93,7 +93,12 @@ impl RecvData { } let read = read_buf.filled().len(); if read == 0 { - return self.transmit_error(cx, DispatchErrorKind::Disconnect); + let _ = self.transmit_message( + cx, + OutputMessage::OutputExit(DispatchErrorKind::Disconnect), + ); + self.state = DecodeState::Send; + return Poll::Pending; } match self.decoder.decode(&buf[..read]) { diff --git a/ylong_http_client/src/util/h2/streams.rs b/ylong_http_client/src/util/h2/streams.rs index 7aff74622aa7c095607732364031e779de106d5b..40022e2b9631409db373d66b3ec65f76397984a1 100644 --- a/ylong_http_client/src/util/h2/streams.rs +++ b/ylong_http_client/src/util/h2/streams.rs @@ -493,10 +493,12 @@ impl Streams { } } - pub(crate) fn get_go_away_streams(&mut self, last_stream_id: StreamId) -> Vec { + // Get unset streams less than or equal to last_stream_id and change the state + // of streams greater than last_stream_id to RemoteAaway + pub(crate) fn get_unset_streams(&mut self, last_stream_id: StreamId) -> Vec { let mut ids = vec![]; for (id, unsent_stream) in self.stream_map.iter_mut() { - if *id >= last_stream_id { + if *id > last_stream_id { match unsent_stream.state { // TODO Whether the close state needs to be selected. H2StreamState::Closed(_) => {} @@ -910,13 +912,13 @@ mod ut_streams { .insert(4, stream_new(H2StreamState::Closed(CloseReason::EndStream))); streams.increase_current_concurrency(); - let go_away_streams = streams.get_go_away_streams(2); - assert!([2, 3, 4].iter().all(|&e| go_away_streams.contains(&e))); + let go_away_streams = streams.get_unset_streams(2); + assert!([3, 4].iter().all(|&e| go_away_streams.contains(&e))); let state = streams.stream_state(1).unwrap(); assert_eq!(state, H2StreamState::Idle); let state = streams.stream_state(2).unwrap(); - assert_eq!(state, H2StreamState::Closed(CloseReason::RemoteGoAway)); + assert_eq!(state, H2StreamState::Idle); let state = streams.stream_state(3).unwrap(); assert_eq!(state, H2StreamState::Closed(CloseReason::RemoteGoAway)); let state = streams.stream_state(4).unwrap();