diff --git a/todo_list/file_remote_transfer/file_remote_transfer.patch b/todo_list/file_remote_transfer/file_remote_transfer.patch new file mode 100644 index 0000000000000000000000000000000000000000..6dc82cc35986c9f4ba00a9ff6dbb3349bf31bd64 --- /dev/null +++ b/todo_list/file_remote_transfer/file_remote_transfer.patch @@ -0,0 +1,2546 @@ +diff --git a/BUILD.gn b/BUILD.gn +index cbb5ca4c9fffe8e4105ecb92854e280bbf16e4a6..785b6509886a4d9852098c5f410046bc5c78a938 100644 +--- a/BUILD.gn ++++ b/BUILD.gn +@@ -186,6 +186,7 @@ if (product_name != "ohos-sdk") { + + external_deps = [ + "bounds_checking_function:libsec_static", ++ "hicollie:hicollie_rust", + "hilog:libhilog", + "init:libbegetutil", + "lz4:liblz4_static", +@@ -224,6 +225,7 @@ if (product_name != "ohos-sdk") { + features += [ "emulator" ] + } + external_deps = [ ++ "hicollie:hicollie_rust", + "hilog:hilog_rust", + "ylong_runtime:ylong_runtime", + ] +@@ -273,7 +275,10 @@ template("build_hdc") { + "//third_party/rust/crates/nix:lib", + "//third_party/rust/crates/rust-openssl/openssl:lib", + ] +- external_deps = [ "hilog:hilog_rust" ] ++ external_deps = [ ++ "hicollie:hicollie_rust", ++ "hilog:hilog_rust", ++ ] + + if (product_name != "ohos_sdk") { + external_deps += [ "ylong_runtime:ylong_runtime" ] +diff --git a/hdc_rust/BUILD.gn b/hdc_rust/BUILD.gn +index 17f6f81738d52daa717cc4cd64fc000d988cad9f..748942039fc29d86c7535a8c72bf897512aa78bb 100644 +--- a/hdc_rust/BUILD.gn ++++ b/hdc_rust/BUILD.gn +@@ -35,6 +35,7 @@ ohos_static_library("serialize_structs") { + + external_deps = [ + "bounds_checking_function:libsec_static", ++ "hicollie:hicollie_rust", + "hilog:libhilog", + "init:libbegetutil", + "lz4:liblz4_static", +@@ -67,6 +68,7 @@ ohos_rust_executable("hdcd") { + "//third_party/rust/crates/rust-openssl/openssl:lib", + ] + external_deps = [ ++ "hicollie:hicollie_rust", + "hilog:hilog_rust", + "ylong_runtime:ylong_runtime", + ] +diff --git a/hdc_rust/src/common/forward.rs b/hdc_rust/src/common/forward.rs +index 5f5e6f63d39d8a7469a9546a4ffd5127be179375..c4ed4e1bc3953708b820e0d4a07faba845ee2f9a 100644 +--- a/hdc_rust/src/common/forward.rs ++++ b/hdc_rust/src/common/forward.rs +@@ -23,9 +23,11 @@ use libc::SOCK_STREAM; + use libc::{AF_LOCAL, AF_UNIX, FD_CLOEXEC, F_SETFD}; + use std::collections::HashMap; + #[cfg(not(target_os = "windows"))] +-use std::fs::{self, File, OpenOptions}; ++use std::fs::File; ++use std::fs::{self, OpenOptions}; + #[cfg(not(target_os = "windows"))] +-use std::io::{self, Error, ErrorKind, Read, Write}; ++use std::io::Read; ++use std::io::{Error, ErrorKind, Result, Write}; + use ylong_runtime::sync::{Mutex, RwLock}; + + use crate::common::base::Base; +@@ -105,7 +107,7 @@ pub struct ContextForward { + channel_id: u32, + check_order: bool, + is_master: bool, +- id: u32, ++ pub id: u32, + fd: i32, + target_fd: i32, + forward_type: ForwardType, +@@ -273,7 +275,7 @@ impl ForwardContextMap { + static ONCE: Once = Once::new(); + unsafe { + ONCE.call_once(|| { +- CONTEXT_MAP = MaybeUninit::new(Arc::new(Mutex::new(HashMap::new()))); ++ CONTEXT_MAP = MaybeUninit::new(Arc::new(Mutex::new(HashMap::new()))); + } + ); + &*CONTEXT_MAP.as_ptr() +@@ -399,14 +401,8 @@ impl ForwardTaskMap { + _ => "unknown".to_string(), + }; + let second_args = match forward_task.remote_args.len() { +- 0 => format!( +- "{}:{}", +- forward_task.local_args[0], forward_task.local_args[1] +- ), +- 2 => format!( +- "{}:{}", +- forward_task.remote_args[0], forward_task.remote_args[1] +- ), ++ 0 => "unknown".to_string(), ++ 2 => format!("{}:{}", forward_task.remote_args[0], forward_task.remote_args[1]), + _ => "unknown".to_string(), + }; + result.push_str(&format!( +@@ -481,7 +477,7 @@ pub struct HdcForward { + remote_args: Vec, + task_command: String, + forward_type: ForwardType, +- context_forward: ContextForward, ++ pub context_forward: ContextForward, + pub transfer: HdcTransferBase, + } + +@@ -536,7 +532,7 @@ pub fn check_node_info(value: &String, arg: &mut Vec) -> bool { + } + + #[cfg(feature = "host")] +-pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> io::Result<()> { ++pub async fn on_forward_success(task_message: TaskMessage, session_id: u32) -> Result<()> { + crate::info!("on_forward_success"); + let channel_id = task_message.channel_id; + let payload = task_message.payload; +@@ -650,7 +646,7 @@ pub async fn detech_forward_type(ctx_point: &mut ContextForward) -> bool { + true + } + +-pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> io::Result<()> { ++pub async fn forward_tcp_accept(ctx: &mut ContextForward, port: u32) -> Result<()> { + let saddr = format!("127.0.0.1:{}", port); + let session_tmp = ctx.session_id; + let channel_tmp = ctx.channel_id; +@@ -747,7 +743,7 @@ pub async fn daemon_connect_tcp(cid: u32, port: u32) { + }); + } + +-pub async fn update_context_to_task(session_id: u32, channel_id:u32, ctx: &mut ContextForward) { ++pub async fn update_context_to_task(session_id: u32, channel_id: u32, ctx: &mut ContextForward) { + let Some(task) = ForwardTaskMap::get(ctx.session_id, ctx.channel_id).await else { + crate::error!( + "update context to task is none session_id={:#?},channel_id={:#?}", +@@ -756,7 +752,7 @@ pub async fn update_context_to_task(session_id: u32, channel_id:u32, ctx: &mut C + ); + return; + }; +- let task = &mut task.clone(); ++ let task = &mut task.clone(); + task.context_forward = ctx.clone(); + ForwardTaskMap::update(session_id, channel_id, task.clone()).await; + } +@@ -848,17 +844,15 @@ pub async fn free_context(cid: u32, notify_remote: bool) { + match ctx.forward_type { + ForwardType::Tcp => { + TcpWriteStreamMap::end(ctx.id).await; ++ TcpListenerMap::end(ctx.channel_id).await; + } + ForwardType::Jdwp | ForwardType::Ark => { + TcpWriteStreamMap::end(ctx.id).await; + let ret = unsafe { libc::close(ctx.fd) }; + crate::debug!("close context_forward fd, ret={}, id={}", ret, ctx.id,); + let target_fd_ret = unsafe { libc::close(ctx.target_fd) }; +- crate::debug!( +- "close context_forward target fd, ret={}, id={}", +- target_fd_ret, +- ctx.id, +- ); ++ crate::debug!("close context_forward target fd, ret={}, id={}", target_fd_ret, ctx.id,); ++ TcpListenerMap::end(ctx.channel_id).await; + } + ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => { + crate::error!("Abstract begin to free forward close fd = {:#?}", ctx.fd); +@@ -938,7 +932,7 @@ async fn server_socket_bind_listen(ctx: &mut ContextForward, path: String) -> bo + true + } + +-pub async fn canonicalize(path: String) -> Result { ++pub async fn canonicalize(path: String) -> Result { + match fs::canonicalize(path) { + Ok(abs_path) => match abs_path.to_str() { + Some(path) => Ok(path.to_string()), +@@ -1144,7 +1138,7 @@ pub async fn daemon_connect_pipe(ctx: &mut ContextForward) { + }); + let addr = UdsAddr::parse_abstract(&socket_name[1..]); + if let Ok(addr_obj) = &addr { +- let ret: Result<(), Error> = UdsClient::wrap_connect(ctx.fd, addr_obj); ++ let ret = UdsClient::wrap_connect(ctx.fd, addr_obj); + if ret.is_err() { + hdctransfer::echo_client( + ctx.session_id, +@@ -1410,13 +1404,13 @@ pub async fn slave_connect( + } + + pub async fn read_data_to_forward(ctx: &mut ContextForward) { +- let cid = ctx.id; +- let session = ctx.session_id; +- let channel = ctx.channel_id; + match ctx.forward_type { + ForwardType::Abstract | ForwardType::FileSystem | ForwardType::Reserved => { + #[cfg(not(target_os = "windows"))] + { ++ let cid = ctx.id; ++ let session = ctx.session_id; ++ let channel = ctx.channel_id; + let fd_temp = ctx.fd; + utils::spawn(async move { + deamon_read_socket_msg(session, channel, fd_temp, cid).await +@@ -1431,9 +1425,9 @@ pub async fn read_data_to_forward(ctx: &mut ContextForward) { + } + } + +-pub fn filter_command(_payload: &[u8]) -> io::Result<(String, u32)> { ++pub fn filter_command(_payload: &[u8]) -> Result<(String, u32)> { + let bytes = &_payload[4..]; +- let ct: Result = String::from_utf8(bytes.to_vec()); ++ let ct = String::from_utf8(bytes.to_vec()); + if let Ok(content) = ct { + let mut id_bytes = [0u8; 4]; + id_bytes.copy_from_slice(&_payload[0..4]); +diff --git a/hdc_rust/src/common/hdcfile.rs b/hdc_rust/src/common/hdcfile.rs +index b5d55d4b3adfec4759700d1d9813402604c9cd51..80534b89e858771ab047ef396b77e8e801fc78ab 100644 +--- a/hdc_rust/src/common/hdcfile.rs ++++ b/hdc_rust/src/common/hdcfile.rs +@@ -20,9 +20,9 @@ use std::fs::metadata; + + use std::collections::HashMap; + use std::io; ++use std::io::{Error, ErrorKind}; + use std::path::Path; + use std::sync::Arc; +-use std::io::{Error, ErrorKind}; + #[cfg(feature = "host")] + extern crate ylong_runtime_static as ylong_runtime; + use ylong_runtime::sync::Mutex; +@@ -243,9 +243,6 @@ async fn set_master_parameters( + let mut task = task.lock().await; + let mut i: usize = 0; + let mut src_argv_index = 0u32; +- if task.transfer.server_or_daemon { +- src_argv_index += 2; // 2: represent the host parameters: "file" "send". +- } // else: src_argv_index += 0: the host parameters "file" "recv" will be filtered. + while i < argc as usize { + match &argv[i] as &str { + "-z" => { +@@ -343,7 +340,7 @@ async fn put_file_check(session_id: u32, channel_id: u32) { + command: HdcCommand::FileCheck, + payload: task.transfer.transfer_config.serialize(), + }; +- transfer::put(task.transfer.session_id, file_check_message).await; ++ transfer::send_to_another(task.transfer.session_id, file_check_message).await; + } + + pub async fn check_slaver(session_id: u32, channel_id: u32, _payload: &[u8]) -> Result { +@@ -396,7 +393,7 @@ pub async fn wake_up_slaver(session_id: u32, channel_id: u32) { + command: HdcCommand::KernelWakeupSlavetask, + payload: Vec::::new(), + }; +- transfer::put(session_id, wake_up_message).await; ++ transfer::send_to_another(session_id, wake_up_message).await; + } + + async fn put_file_begin(session_id: u32, channel_id: u32) { +@@ -405,7 +402,7 @@ async fn put_file_begin(session_id: u32, channel_id: u32) { + command: HdcCommand::FileBegin, + payload: Vec::::new(), + }; +- transfer::put(session_id, file_begin_message).await; ++ transfer::send_to_another(session_id, file_begin_message).await; + } + + async fn transfer_next(session_id: u32, channel_id: u32) -> bool { +@@ -459,41 +456,10 @@ async fn on_all_transfer_finish(session_id: u32, channel_id: u32) { + size, task.file_cnt, time, rate + ) + } else { +- format!( +- "Transfer failed: {}: {}", +- task.transfer.local_path, +- io::Error::from_raw_os_error(last_error as i32), +- ) ++ format!("Transfer failed: {}: {}", task.transfer.local_path, io::Error::from_raw_os_error(last_error as i32),) + }; +- #[cfg(feature = "host")] +- { +- let level = if last_error == 0 { +- transfer::EchoLevel::OK +- } else { +- transfer::EchoLevel::FAIL +- }; +- let _ = +- transfer::send_channel_msg(task.transfer.channel_id, level, message) +- .await; +- hdctransfer::close_channel(channel_id).await; +- return; +- } +- #[allow(unreachable_code)] +- { +- let level = if last_error == 0 { +- MessageLevel::Ok +- } else { +- MessageLevel::Fail +- }; +- hdctransfer::echo_client( +- task.transfer.session_id, +- task.transfer.channel_id, +- message.as_str(), +- level, +- ) +- .await; +- hdctransfer::close_channel(channel_id).await; +- } ++ let level = if last_error == 0 { MessageLevel::Ok } else { MessageLevel::Fail }; ++ hdctransfer::echo_client(task.transfer.session_id, task.transfer.channel_id, message.as_str(), level).await; + } + + async fn is_task_queue_empty(session_id: u32, channel_id: u32) -> bool { +@@ -523,7 +489,7 @@ async fn do_file_finish(session_id: u32, channel_id: u32, _payload: &[u8]) { + payload: [0].to_vec(), + }; + +- transfer::put(session_id, _finish_message).await; ++ transfer::send_to_another(session_id, _finish_message).await; + } + } else { + on_all_transfer_finish(session_id, channel_id).await; +@@ -547,7 +513,7 @@ async fn put_file_finish(session_id: u32, channel_id: u32) { + command: HdcCommand::FileFinish, + payload: _payload.to_vec(), + }; +- transfer::put(session_id, task_finish_message).await; ++ transfer::send_to_another(session_id, task_finish_message).await; + } + + pub async fn command_dispatch( +@@ -600,6 +566,11 @@ pub async fn command_dispatch( + return false; + }; + let mut task = task.lock().await; ++ if task.transfer.stop_run { ++ crate::error!("stop_run {}", task.transfer.stop_run); ++ task_finish(session_id, channel_id).await; ++ return false; ++ } + if hdctransfer::transfer_data(&mut task.transfer, _payload) { + drop(task); + put_file_finish(session_id, channel_id).await; +@@ -620,12 +591,8 @@ pub async fn command_dispatch( + } + + async fn put_file_mode(session_id: u32, channel_id: u32) { +- let task_message = TaskMessage { +- channel_id, +- command: HdcCommand::FileMode, +- payload: Vec::::new(), +- }; +- transfer::put(session_id, task_message).await; ++ let task_message = TaskMessage { channel_id, command: HdcCommand::FileMode, payload: Vec::::new() }; ++ transfer::send_to_another(session_id, task_message).await; + } + + async fn task_finish(session_id: u32, channel_id: u32) { +@@ -650,18 +617,8 @@ pub async fn echo_fail(session_id: u32, channel_id: u32, error: Error, is_checke + format!("{}", error) + } + } +- None => format!( +- "Error opening file: {}, path: {}", +- error, +- "cannot get file path from FileTaskMap", +- ) ++ None => format!("Error opening file: {}, path: {}", error, "cannot get file path from FileTaskMap",), + }; +- hdctransfer::echo_client( +- session_id, +- channel_id, +- message.as_str(), +- MessageLevel::Fail, +- ) +- .await; ++ hdctransfer::echo_client(session_id, channel_id, message.as_str(), MessageLevel::Fail).await; + task_finish(session_id, channel_id).await; +-} +\ No newline at end of file ++} +diff --git a/hdc_rust/src/common/hdctransfer.rs b/hdc_rust/src/common/hdctransfer.rs +index 8998ea81754f248b80035c306e036434b2162700..18f0eefb06dd04e905983d11e3cd3f91641efe97 100644 +--- a/hdc_rust/src/common/hdctransfer.rs ++++ b/hdc_rust/src/common/hdctransfer.rs +@@ -15,26 +15,26 @@ + //! hdctransfer + #![allow(missing_docs)] + use std::collections::VecDeque; +-use std::fs::{self, File, OpenOptions, metadata}; +-use std::io::{Read, Seek, Write, Error}; +-use std::path::PathBuf; +-#[cfg(not(target_os = "windows"))] +-use std::path::Path; ++use std::fs::{self, metadata, File, OpenOptions}; ++use std::io::{Error, Read, Seek, Write}; + #[cfg(not(target_os = "windows"))] + use std::os::unix::fs::PermissionsExt; ++#[cfg(not(target_os = "windows"))] ++use std::path::Path; ++use std::path::PathBuf; + use std::sync::Arc; + + use crate::common::base::Base; + use crate::common::hdcfile::FileTaskMap; + use crate::config::HdcCommand; + use crate::config::TaskMessage; +-use crate::{config::*, utils}; + use crate::serializer::native_struct::TransferConfig; + use crate::serializer::native_struct::TransferPayload; + use crate::serializer::serialize::Serialization; + use crate::transfer; + #[cfg(not(feature = "host"))] + use crate::utils::hdc_log::*; ++use crate::{config::*, utils}; + #[cfg(feature = "host")] + extern crate ylong_runtime_static as ylong_runtime; + use ylong_runtime::sync::Mutex; +@@ -448,7 +448,7 @@ pub async fn read_and_send_data( + let Ok((is_finish, task_message)) = handler.await else { + continue; + }; +- transfer::put(session_id, task_message).await; ++ transfer::send_to_another(session_id, task_message).await; + if is_finish { + crate::debug!("read_and_send_data is finish return false"); + return false; +@@ -617,7 +617,7 @@ pub async fn transfer_task_finish(channel_id: u32, _session_id: u32) { + command: HdcCommand::KernelChannelClose, + payload: [1].to_vec(), + }; +- transfer::put(_session_id, task_message).await; ++ transfer::send_to_another(_session_id, task_message).await; + } + + pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_finish: HdcCommand) { +@@ -626,7 +626,7 @@ pub async fn transfer_file_finish(channel_id: u32, _session_id: u32, comamnd_fin + command: comamnd_finish, + payload: [1].to_vec(), + }; +- transfer::put(_session_id, task_message).await; ++ transfer::send_to_another(_session_id, task_message).await; + } + + pub async fn close_channel(channel_id: u32) { +@@ -657,4 +657,4 @@ pub async fn echo_client(_session_id: u32, channel_id: u32, message: &str, level + }; + transfer::put(_session_id, echo_message).await; + } +-} +\ No newline at end of file ++} +diff --git a/hdc_rust/src/config.rs b/hdc_rust/src/config.rs +index 4f9c0d35b1731171ad1d7f864cb19b637d5f9e1a..d94f934087f08a330f34f1793446733af511a751 100644 +--- a/hdc_rust/src/config.rs ++++ b/hdc_rust/src/config.rs +@@ -156,6 +156,7 @@ pub enum HdcCommand { + FlashdProgress, + + UartFinish, ++ Unknown, + } + + impl TryFrom for HdcCommand { +@@ -355,6 +356,7 @@ pub const TAG_PUBKEY: &str = "pubkey"; + pub const TAG_EMGMSG: &str = "emgmsg"; + pub const TAG_TOKEN: &str = "token"; + pub const TAG_DAEOMN_AUTHSTATUS: &str = "daemonauthstatus"; ++pub const CMDSTR_REMOTE_PARAMETER: &str = "remote"; + + pub const LOG_LEVEL_ORDER: [LevelFilter; 7] = [ + LevelFilter::Off, +diff --git a/hdc_rust/src/daemon_lib/mod.rs b/hdc_rust/src/daemon_lib/mod.rs +index 7d3b296b3718b4ab2a8c24db24ec10e9282e6b19..a6cc2fe5baed664f3da5e217d5725f231386ae4a 100644 +--- a/hdc_rust/src/daemon_lib/mod.rs ++++ b/hdc_rust/src/daemon_lib/mod.rs +@@ -16,43 +16,45 @@ + #![allow(missing_docs)] + + pub mod auth; ++#[cfg(feature = "emulator")] ++pub mod bridge; + pub mod daemon_app; + pub mod daemon_unity; + pub mod mount; + pub mod shell; ++pub mod sys_para; + pub mod task; + pub mod task_manager; +-pub mod sys_para; +- +-#[cfg(feature = "emulator")] +-pub mod bridge; +- +-use std::io::{self, ErrorKind}; +-use std::sync::Arc; +-use std::ffi::c_int; +-use crate::utils::{self, hdc_log::*}; + + use crate::common::jdwp::Jdwp; + use crate::config; + use crate::config::TaskMessage; + #[cfg(feature = "emulator")] + use crate::daemon_lib::bridge; ++use crate::daemon_lib::sys_para::*; ++use crate::daemon_transfer::usb; + use crate::transfer; + #[cfg(not(feature = "emulator"))] + use crate::transfer::base::Reader; ++use crate::transfer::buffer::DiedSession; + #[cfg(not(feature = "emulator"))] + use crate::transfer::uart::UartReader; + #[cfg(not(feature = "emulator"))] + use crate::transfer::uart_wrapper; +-use crate::transfer::buffer::DiedSession; ++use crate::utils::{self, hdc_log::*}; + +-use crate::daemon_lib::sys_para::*; ++use std::ffi::c_int; + use std::ffi::CString; ++use std::io::{self, ErrorKind}; ++use std::sync::Arc; + #[cfg(not(feature = "emulator"))] + use ylong_runtime::net::{TcpListener, TcpStream}; + #[cfg(not(feature = "emulator"))] + use ylong_runtime::sync::mpsc; + ++use libc::c_void; ++type XCollieCallbackRust = extern "C" fn(arg: *mut libc::c_void); ++ + extern "C" { + #[cfg(not(feature = "emulator"))] + fn NeedDropRootPrivileges() -> c_int; +@@ -356,11 +358,11 @@ pub async fn uart_handle_client(fd: i32) -> io::Result<()> { + #[cfg(not(feature = "emulator"))] + pub async fn usb_daemon_start() -> io::Result<()> { + loop { +- let ret = transfer::usb::usb_init(); ++ let ret = usb::usb_init(); + match ret { + Ok((config_fd, bulkin_fd, bulkout_fd)) => { + let _ = usb_handle_client(config_fd, bulkin_fd, bulkout_fd).await; +- transfer::usb::usb_close(config_fd, bulkin_fd, bulkout_fd); ++ usb::usb_close(config_fd, bulkin_fd, bulkout_fd); + } + Err(e) => { + crate::error!("usb init failure and restart hdcd error is {:?}", e); +@@ -370,10 +372,15 @@ pub async fn usb_daemon_start() -> io::Result<()> { + } + } + ++/// # Safety ++pub unsafe extern "C" fn new_session_callback(_ptr: *mut c_void) { ++ crate::error!("new session proc timeout, will restart hdcd"); ++} ++ + #[cfg(not(feature = "emulator"))] + pub async fn usb_handle_client(_config_fd: i32, bulkin_fd: i32, bulkout_fd: i32) -> io::Result<()> { +- let _rd = transfer::usb::UsbReader { fd: bulkin_fd }; +- let mut rx = transfer::usb_start_recv(bulkin_fd, 0); ++ let _rd = usb::UsbReader { fd: bulkin_fd }; ++ let mut rx = usb::usb_start_recv(bulkin_fd, 0); + let mut cur_session_id = 0; + loop { + match rx.recv().await { +@@ -381,11 +388,16 @@ pub async fn usb_handle_client(_config_fd: i32, bulkin_fd: i32, bulkout_fd: i32) + if msg.command == config::HdcCommand::KernelHandshake { + if let Ok(session_id_in_msg) = auth::get_session_id_from_msg(&msg).await { + if session_id_in_msg != cur_session_id { ++ let new_session_func: XCollieCallbackRust = unsafe { ++ std::mem::transmute::<*const (), XCollieCallbackRust>(new_session_callback as *const ()) ++ }; ++ let timer = unsafe { hicollie_rust::set_timer("new session".as_ptr(), 10 /* second */, new_session_func, std::ptr::null_mut() as *mut c_void, 3) }; + task_manager::free_session(cur_session_id).await; + crate::info!("free session(usb) over {:?} and new session is {}", cur_session_id, session_id_in_msg); +- let wr = transfer::usb::UsbWriter { fd: bulkout_fd }; +- transfer::UsbMap::start(session_id_in_msg, wr).await; ++ let wr = usb::UsbWriter { fd: bulkout_fd }; ++ usb::UsbMap::start(session_id_in_msg, wr).await; + cur_session_id = session_id_in_msg; ++ hicollie_rust::cancel_timer(timer); + } + } + } +diff --git a/hdc_rust/src/daemon_lib/shell.rs b/hdc_rust/src/daemon_lib/shell.rs +index 663b70f4d79f34f127d89326eeeec2b96937a3b9..fe02335cff38ce8be321895b7ad7265e2a942469 100644 +--- a/hdc_rust/src/daemon_lib/shell.rs ++++ b/hdc_rust/src/daemon_lib/shell.rs +@@ -36,6 +36,7 @@ use ylong_runtime::process::{Child, Command, ChildStdin, ChildStdout, ChildStder + use ylong_runtime::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReader}; + use ylong_runtime::sync::{mpsc, Mutex}; + use ylong_runtime::sync::error::TryRecvError::Closed; ++use libc::c_int; + + + // -----inner common functions----- +@@ -484,6 +485,43 @@ impl PtyMap { + } + + // -----noninteractive shell implementation----- ++type ShellExecuteChildMap_ = std::sync::Mutex>; ++pub struct ShellExecuteChildMap {} ++impl ShellExecuteChildMap { ++ fn get_instance() -> &'static ShellExecuteChildMap_ { ++ static mut SHELLEXECUTECHILD_MAP: MaybeUninit = MaybeUninit::uninit(); ++ static ONCE: Once = Once::new(); ++ ++ unsafe { ++ ONCE.call_once(|| { ++ SHELLEXECUTECHILD_MAP = MaybeUninit::new(std::sync::Mutex::new(HashMap::new())); ++ } ++ ); ++ &*SHELLEXECUTECHILD_MAP.as_ptr() ++ } ++ } ++ ++ pub async fn put(session_id: u32, channel_id: u32, childpid: i32) { ++ let shell_execute_child_map = Self::get_instance(); ++ let mut map = shell_execute_child_map.lock().unwrap(); ++ map.insert((session_id, channel_id), childpid); ++ } ++ ++ pub async fn del(session_id: u32, channel_id: u32) { ++ let shell_execute_child_map = Self::get_instance(); ++ let mut map = shell_execute_child_map.lock().unwrap(); ++ if let Some(childpid) = map.get(&(session_id, channel_id)) { ++ crate::debug!("kill childpid:{:?}", childpid); ++ let childpid_tmp = *childpid; ++ unsafe { ++ libc::kill(childpid_tmp,libc::SIGTERM); ++ let mut status: c_int = 0; ++ libc::waitpid(childpid_tmp, &mut status, 0); ++ }; ++ } ++ map.remove(&(session_id, channel_id)); ++ } ++} + + type ShellExecuteMap_ = Mutex>>; + pub struct ShellExecuteMap {} +@@ -523,7 +561,6 @@ impl ShellExecuteMap { + if iter.0 .0 != session_id { + continue; + } +- iter.1.handle.cancel(); + channel_vec.push(iter.0 .1); + crate::debug!( + "Clear shell_execute_map task, session_id: {}, channel_id:{}, task_size: {}", +@@ -534,6 +571,7 @@ impl ShellExecuteMap { + } + for channel_id in channel_vec{ + map.remove(&(session_id, channel_id)); ++ ShellExecuteChildMap::del(session_id, channel_id).await; + } + } + } +@@ -612,8 +650,7 @@ async fn task_for_shell_execute( + shell_cmd.args(["-c", &cmd]) + .stdout(Stdio::piped()) + .stdin(Stdio::piped()) +- .stderr(Stdio::piped()) +- .kill_on_drop(true); ++ .stderr(Stdio::piped()); + + unsafe { + shell_cmd.pre_exec(|| { +@@ -626,6 +663,13 @@ async fn task_for_shell_execute( + } + + if let Ok(mut child) = shell_cmd.spawn() { ++ if let Some(childpid) = child.id() { ++ crate::error!("connection[session:{:?}, channel:{:?}] pid is {:?}", shell_task_id.session_id, shell_task_id.channel_id, childpid); ++ ShellExecuteChildMap::put(shell_task_id.session_id, shell_task_id.channel_id, childpid as i32).await; ++ } else { ++ crate::error!("get child pid error"); ++ } ++ + + let mut child_in = match child.take_stdin() { + Some(child_in_inner) => { +diff --git a/hdc_rust/src/daemon_lib/task_manager.rs b/hdc_rust/src/daemon_lib/task_manager.rs +index 5b7f0a90ef0bfd389f63508af3ed3b58870651a0..b33c63ca3114e4a7b02a782f58c4082cec2f93fe 100644 +--- a/hdc_rust/src/daemon_lib/task_manager.rs ++++ b/hdc_rust/src/daemon_lib/task_manager.rs +@@ -13,24 +13,23 @@ + * limitations under the License. + */ + //! shell +- +-#[allow(unused_imports)] +-use crate::daemon_lib::daemon_app; +-use crate::daemon_lib::shell; +-use crate::daemon_lib::auth; + #[allow(unused_imports)] + use crate::common::forward; + #[allow(unused_imports)] +-use crate::common::jdwp; +-#[allow(unused_imports)] + use crate::common::hdcfile; + #[allow(unused_imports)] +-use crate::utils::hdc_log::*; ++use crate::common::jdwp; + use crate::config::ConnectType; ++use crate::daemon_lib::auth; ++#[allow(unused_imports)] ++use crate::daemon_lib::daemon_app; ++use crate::daemon_lib::shell; ++use crate::daemon_transfer::usb::UsbMap; + use crate::transfer::buffer; +-use crate::transfer::TcpMap; +-use crate::transfer::UsbMap; + use crate::transfer::ConnectTypeMap; ++use crate::transfer::TcpMap; ++#[allow(unused_imports)] ++use crate::utils::hdc_log::*; + + pub async fn free_all_sessiones() { + let sessiones = ConnectTypeMap::get_all_session().await; +diff --git a/hdc_rust/src/daemon_transfer.rs b/hdc_rust/src/daemon_transfer.rs +new file mode 100644 +index 0000000000000000000000000000000000000000..c7ab55d232e84090c30af0d0d0aa28ced1566a53 +--- /dev/null ++++ b/hdc_rust/src/daemon_transfer.rs +@@ -0,0 +1,17 @@ ++/* ++ * Copyright (C) 2024 Huawei Device Co., Ltd. ++ * Licensed under the Apache License, Version 2.0 (the "License"); ++ * you may not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++//! daemon_transfer ++#![allow(missing_docs)] ++pub mod usb; +diff --git a/hdc_rust/src/daemon_transfer/usb.rs b/hdc_rust/src/daemon_transfer/usb.rs +new file mode 100644 +index 0000000000000000000000000000000000000000..4ce4d0df9a3559f24d8dfed77d7ba6e6a30ee8ca +--- /dev/null ++++ b/hdc_rust/src/daemon_transfer/usb.rs +@@ -0,0 +1,398 @@ ++/* ++ * Copyright (C) 2023 Huawei Device Co., Ltd. ++ * Licensed under the Apache License, Version 2.0 (the "License"); ++ * you may not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++//! usb ++#![allow(missing_docs)] ++use crate::config::{self, ConnectType, HdcCommand, TaskMessage}; ++ ++use crate::utils; ++#[allow(unused)] ++use crate::utils::hdc_log::*; ++ ++use crate::serializer; ++use crate::serializer::native_struct::UsbHead; ++use crate::serializer::pack_struct::UsbHeadPack; ++use crate::serializer::serialize::{Serialization, SerializedBuffer}; ++ ++use crate::transfer::base::{self, Reader}; ++use crate::transfer::buffer::DiedSession; ++use crate::transfer::ConnectTypeMap; ++ ++use ylong_runtime::sync::mpsc; ++use ylong_runtime::sync::mpsc::BoundedSender; ++ ++#[cfg(not(feature = "host"))] ++use libc::{fcntl, FD_CLOEXEC, F_SETFD}; ++use std::collections::HashMap; ++#[cfg(not(target_os = "windows"))] ++use std::ffi::{CStr, CString}; ++use std::io::{self, Error, ErrorKind}; ++use std::mem::MaybeUninit; ++use std::sync::Once; ++use std::time::Duration; ++ ++#[repr(C)] ++pub struct PersistBuffer { ++ pub ptr: *const libc::c_char, ++ pub size: libc::c_ulonglong, ++} ++ ++pub fn buf_to_vec(buf: PersistBuffer) -> Vec { ++ let slice = unsafe { std::slice::from_raw_parts(buf.ptr as *const libc::c_uchar, buf.size as usize) }; ++ slice.to_vec() ++} ++ ++#[allow(unused)] ++extern "C" { ++ fn access(_name: *const libc::c_char, _type: i32) -> i32; ++ fn free(ptr: *const libc::c_void); ++ ++ fn ConfigEpPointEx(path: *const libc::c_char) -> i32; ++ fn OpenEpPointEx(path: *const libc::c_char) -> i32; ++ fn CloseUsbFdEx(fd: i32) -> i32; ++ fn CloseEndPointEx(bulkIn: i32, bulkOut: i32, ctrlEp: i32, closeCtrlEp: u8); ++ #[cfg(not(target_os = "windows"))] ++ fn WriteUsbDevEx(bulkOut: i32, buf: SerializedBuffer) -> i32; ++ #[cfg(not(target_os = "windows"))] ++ fn ReadUsbDevEx(bulkIn: i32, buf: *mut u8, size: usize) -> usize; ++ fn GetDevPathEx(path: *const libc::c_char) -> *const libc::c_char; ++ ++ fn SerializeUsbHead(value: *const UsbHeadPack) -> SerializedBuffer; ++ fn ParseUsbHead(value: *mut UsbHeadPack, buf: SerializedBuffer) -> libc::c_uchar; ++} ++ ++#[cfg(not(target_os = "windows"))] ++pub fn usb_init() -> io::Result<(i32, i32, i32)> { ++ crate::info!("opening usb fd..."); ++ let path = CString::new(config::USB_FFS_BASE).unwrap(); ++ ++ let base_path = unsafe { ++ let p = GetDevPathEx(path.as_ptr()); ++ let c_str = CStr::from_ptr(p); ++ c_str.to_str().unwrap().to_string() ++ }; ++ // let c_str: &CStr = unsafe { CStr::from_ptr(p) }; ++ // c_str.to_str().unwrap().to_string() ++ // let base_path = serializer::ptr_to_string(unsafe { GetDevPathEx(path.as_ptr()) }); ++ let ep0 = CString::new(base_path.clone() + "/ep0").unwrap(); ++ let ep1 = CString::new(base_path.clone() + "/ep1").unwrap(); ++ let ep2 = CString::new(base_path + "/ep2").unwrap(); ++ if unsafe { access(ep0.as_ptr(), 0) } != 0 { ++ return Err(utils::error_other("cannot access usb path".to_string())); ++ } ++ ++ let config_fd = unsafe { ConfigEpPointEx(ep0.as_ptr()) }; ++ if config_fd < 0 { ++ return Err(utils::error_other("cannot open usb ep0".to_string())); ++ } ++ ++ let bulkin_fd = unsafe { OpenEpPointEx(ep1.as_ptr()) }; ++ if bulkin_fd < 0 { ++ return Err(utils::error_other("cannot open usb ep1".to_string())); ++ } ++ ++ let bulkout_fd = unsafe { OpenEpPointEx(ep2.as_ptr()) }; ++ if bulkout_fd < 0 { ++ return Err(utils::error_other("cannot open usb ep2".to_string())); ++ } ++ #[cfg(not(feature = "host"))] ++ unsafe { ++ // cannot open with O_CLOEXEC, must fcntl ++ fcntl(config_fd, F_SETFD, FD_CLOEXEC); ++ fcntl(bulkin_fd, F_SETFD, FD_CLOEXEC); ++ fcntl(bulkout_fd, F_SETFD, FD_CLOEXEC); ++ } ++ ++ crate::info!("usb fd: {config_fd}, {bulkin_fd}, {bulkout_fd}"); ++ ++ Ok((config_fd, bulkin_fd, bulkout_fd)) ++} ++ ++#[cfg(not(target_os = "windows"))] ++pub fn usb_close(config_fd: i32, bulkin_fd: i32, bulkout_fd: i32) { ++ crate::info!("closing usb fd..."); ++ unsafe { ++ CloseUsbFdEx(config_fd); ++ CloseUsbFdEx(bulkin_fd); ++ CloseUsbFdEx(bulkout_fd); ++ } ++} ++ ++pub struct UsbReader { ++ pub fd: i32, ++} ++pub struct UsbWriter { ++ pub fd: i32, ++} ++ ++impl base::Reader for UsbReader { ++ // 屏蔽window编译报错 ++ #[cfg(not(target_os = "windows"))] ++ fn read_frame(&self, expect: usize) -> io::Result> { ++ let mut buf: Vec = Vec::with_capacity(expect); ++ unsafe { ++ let readed = ReadUsbDevEx(self.fd, buf.as_mut_ptr() as *mut libc::uint8_t, expect); ++ if readed != expect { ++ Err(utils::error_other(format!( ++ "usb read error, usb read failed: expect: {} acture: {}", ++ expect, readed, ++ ))) ++ } else { ++ buf.set_len(readed); ++ Ok(buf) ++ } ++ } ++ } ++ ++ // 屏蔽window编译报错 ++ #[cfg(target_os = "windows")] ++ fn read_frame(&self, _expected_size: usize) -> io::Result> { ++ Err(utils::error_other("usb read error".to_string())) ++ } ++ ++ fn check_protocol_head(&mut self) -> io::Result<(u32, u32, u32)> { ++ let buf = self.read_frame(serializer::USB_HEAD_SIZE)?; ++ if buf[..config::USB_PACKET_FLAG.len()] != config::USB_PACKET_FLAG[..] { ++ return Err(Error::new(ErrorKind::Other, format!("USB_PACKET_FLAG incorrect, content: {:#?}", buf))); ++ } ++ let mut head = serializer::native_struct::UsbHead::default(); ++ ++ if let Err(e) = head.parse(buf) { ++ crate::warn!("parse usb head error: {}", e.to_string()); ++ return Err(e); ++ } ++ Ok((u32::from_be(head.data_size), 0, u32::to_be(head.session_id))) ++ } ++} ++ ++#[cfg(not(target_os = "windows"))] ++pub fn usb_write_all(fd: i32, data: Vec) -> io::Result { ++ let buf = SerializedBuffer { ptr: data.as_ptr() as *const libc::c_char, size: data.len() as u64 }; ++ let ret = unsafe { WriteUsbDevEx(fd, buf) } as i32; ++ if ret < 0 { ++ Err(utils::error_other("usb write failed".to_string())) ++ } else { ++ Ok(ret) ++ } ++} ++impl base::Writer for UsbWriter { ++ // 屏蔽window编译报错 ++ #[cfg(not(target_os = "windows"))] ++ #[allow(unused)] ++ fn write_all(&self, data: Vec) -> io::Result { ++ let buf = SerializedBuffer { ptr: data.as_ptr() as *const libc::c_char, size: data.len() as u64 }; ++ let ret = unsafe { WriteUsbDevEx(self.fd, buf) } as i32; ++ if ret < 0 { ++ Err(utils::error_other("usb write failed".to_string())) ++ } else { ++ Ok(ret) ++ } ++ } ++ ++ // 屏蔽window编译报错 ++ #[cfg(target_os = "windows")] ++ fn write_all(&self, _data: Vec) -> io::Result { ++ Ok(0) ++ } ++} ++ ++pub fn build_header(session_id: u32, option: u8, length: usize) -> Vec { ++ UsbHead { ++ session_id: u32::to_be(session_id), ++ flag: [config::USB_PACKET_FLAG[0], config::USB_PACKET_FLAG[1]], ++ option, ++ data_size: u32::to_be(length as u32), ++ } ++ .serialize() ++} ++ ++pub struct UsbMap { ++ map: std::sync::Mutex>, ++ lock: std::sync::Mutex, ++} ++impl UsbMap { ++ pub(crate) fn get_instance() -> &'static UsbMap { ++ static mut USB_MAP: MaybeUninit = MaybeUninit::uninit(); ++ static ONCE: Once = Once::new(); ++ ++ unsafe { ++ ONCE.call_once(|| { ++ let global = UsbMap { map: std::sync::Mutex::new(HashMap::new()), lock: std::sync::Mutex::new(0) }; ++ USB_MAP = MaybeUninit::new(global); ++ }); ++ &*USB_MAP.as_ptr() ++ } ++ } ++ ++ #[allow(unused)] ++ pub async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> { ++ if DiedSession::get(session_id).await { ++ return Err(Error::new(ErrorKind::NotFound, "session already died")); ++ } ++ let mut fd = 0; ++ { ++ let instance = Self::get_instance(); ++ let mut map = instance.map.lock().unwrap(); ++ let Some(arc_wr) = map.get(&session_id) else { ++ return Err(Error::new(ErrorKind::NotFound, "session not found")); ++ }; ++ fd = arc_wr.fd; ++ } ++ let body = serializer::concat_pack(data); ++ let head = build_header(session_id, 1, body.len()); ++ let instance = Self::get_instance(); ++ let _guard = instance.lock.lock().unwrap(); ++ let mut child_ret = 0; ++ match usb_write_all(fd, head) { ++ Ok(_) => {} ++ Err(e) => { ++ return Err(Error::new(ErrorKind::Other, "Error writing head")); ++ } ++ } ++ ++ match usb_write_all(fd, body) { ++ Ok(ret) => { ++ child_ret = ret; ++ } ++ Err(e) => { ++ return Err(Error::new(ErrorKind::Other, "Error writing body")); ++ } ++ } ++ ++ if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) { ++ let tail = build_header(session_id, 0, 0); ++ // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect ++ // so, we send dummy packet to prevent zero packet generate ++ match usb_write_all(fd, tail) { ++ Ok(_) => {} ++ Err(e) => { ++ return Err(Error::new(ErrorKind::Other, "Error writing tail")); ++ } ++ } ++ } ++ Ok(()) ++ } ++ ++ pub async fn start(session_id: u32, wr: UsbWriter) { ++ let buffer_map = Self::get_instance(); ++ let mut try_times = 0; ++ let max_try_time = 10; ++ let wait_one_seconds = 1000; ++ loop { ++ try_times += 1; ++ if let Ok(mut map) = buffer_map.map.try_lock() { ++ map.insert(session_id, wr); ++ crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times"); ++ break; ++ } else { ++ if try_times > max_try_time { ++ crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd"); ++ std::process::exit(0); ++ } ++ crate::error!("start usb session_id:{session_id} try lock failed {try_times} times"); ++ std::thread::sleep(Duration::from_millis(wait_one_seconds)); ++ } ++ } ++ ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await; ++ } ++ ++ pub async fn end(session_id: u32) { ++ let buffer_map = Self::get_instance(); ++ let mut try_times = 0; ++ let max_try_time = 10; ++ let wait_ten_ms = 10; ++ loop { ++ try_times += 1; ++ if let Ok(mut map) = buffer_map.map.try_lock() { ++ let _ = map.remove(&session_id); ++ crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times"); ++ break; ++ } else { ++ if try_times > max_try_time { ++ crate::error!("end usb session_id:{session_id} get lock failed will force break"); ++ break; ++ } ++ crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times"); ++ std::thread::sleep(Duration::from_millis(wait_ten_ms)); ++ } ++ } ++ ConnectTypeMap::del(session_id).await; ++ } ++} ++ ++pub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> { ++ let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN); ++ ylong_runtime::spawn(async move { ++ let mut rd = UsbReader { fd }; ++ loop { ++ if let Err(e) = unpack_task_message(&mut rd, tx.clone()) { ++ crate::warn!("unpack task failed: {}, reopen fd...", e.to_string()); ++ break; ++ } ++ } ++ }); ++ rx ++} ++ ++pub fn unpack_task_message(rd: &mut dyn Reader, tx: BoundedSender<(TaskMessage, u32, u32)>) -> io::Result<()> { ++ let (pack_size, package_index, session_id) = rd.check_protocol_head()?; ++ if pack_size == 0 { ++ return Ok(()); ++ } ++ ++ let data = rd.read_frame(pack_size as usize)?; ++ ylong_runtime::spawn(async move { ++ let (head, body) = data.split_at(serializer::HEAD_SIZE); ++ let payload_head = serializer::unpack_payload_head(head.to_vec()); ++ match payload_head { ++ Ok(payload_head) => { ++ let expected_head_size = u16::from_be(payload_head.head_size) as usize; ++ let expected_data_size = u32::from_be(payload_head.data_size) as usize; ++ ++ if serializer::HEAD_SIZE + expected_head_size + expected_data_size != pack_size as usize { ++ crate::warn!( ++ "protocol size diff: {pack_size} != {} + {expected_head_size} + {expected_data_size}", ++ serializer::HEAD_SIZE ++ ); ++ } ++ ++ if expected_head_size + expected_data_size == 0 ++ || expected_head_size + expected_data_size > config::HDC_BUF_MAX_SIZE ++ { ++ return Err(Error::new(ErrorKind::Other, "Packet size incorrect")); ++ } ++ ++ let (protect, payload) = body.split_at(expected_head_size); ++ ++ let payload_protect = serializer::unpack_payload_protect(protect.to_vec())?; ++ let channel_id = payload_protect.channel_id; ++ ++ let command = match HdcCommand::try_from(payload_protect.command_flag) { ++ Ok(command) => command, ++ Err(_) => { ++ return Err(Error::new(ErrorKind::Other, "unknown command")); ++ } ++ }; ++ ++ let _ = tx ++ .send((TaskMessage { channel_id, command, payload: payload.to_vec() }, package_index, session_id)) ++ .await; ++ Ok(()) ++ } ++ Err(e) => Err(utils::error_other(format!("usb unpack_task_message, err:{:?}", e))), ++ } ++ }); ++ ++ Ok(()) ++} +diff --git a/hdc_rust/src/host/client.rs b/hdc_rust/src/host/client.rs +index eac1a135bef63f48763259bb634e7fafe9b596a4..c345af557355887c055613b2f9a1e4ae2b3402fa 100644 +--- a/hdc_rust/src/host/client.rs ++++ b/hdc_rust/src/host/client.rs +@@ -17,8 +17,11 @@ use super::server; + + use hdc::common::base; + use hdc::common::base::Base; ++use hdc::common::hdcfile::{self, FileTaskMap, HdcFile}; ++use hdc::config::TaskMessage; + use hdc::config::{self, HdcCommand}; +-use hdc::transfer; ++use hdc::transfer::buffer::RemoteTaskMap; ++use hdc::transfer::{ChannelMap, TcpMap}; + use hdc::utils; + #[allow(unused)] + use hdc::utils::hdc_log::*; +@@ -30,12 +33,11 @@ use std::io::{self, Error, ErrorKind, Write}; + #[cfg(not(target_os = "windows"))] + use std::os::fd::AsRawFd; + +-#[cfg(featrue = "host")] ++#[cfg(feature = "host")] + extern crate ylong_runtime_static as ylong_runtime; + #[cfg(not(target_os = "windows"))] + use ylong_runtime::io::AsyncReadExt; +-use ylong_runtime::io::AsyncWriteExt; +-use ylong_runtime::net::{SplitWriteHalf, TcpStream}; ++use ylong_runtime::net::TcpStream; + + #[cfg(target_os = "windows")] + use crate::tty_utility::*; +@@ -45,13 +47,11 @@ extern "C" { + fn getch() -> libc::c_int; + } + +- + #[allow(unused)] + pub struct Client { + command: HdcCommand, + params: Vec, + connect_key: String, +- wr: SplitWriteHalf, + } + + pub async fn run_client_mode(parsed_cmd: ParsedCommand) -> io::Result<()> { +@@ -101,15 +101,9 @@ impl Client { + }; + + let (rd, wr) = stream.into_split(); +- +- transfer::ChannelMap::start(rd).await; +- +- Ok(Self { +- command, +- params: parsed_cmd.parameters, +- connect_key, +- wr, +- }) ++ ChannelMap::start(rd).await; ++ TcpMap::start(0, wr).await; ++ Ok(Self { command, params: parsed_cmd.parameters, connect_key }) + } + + async fn execute_command(&mut self) -> io::Result<()> { +@@ -175,13 +169,17 @@ impl Client { + + async fn send(&mut self, buf: &[u8]) { + hdc::debug!("channel send buf: {:#?}", buf); +- let msg = [u32::to_be_bytes(buf.len() as u32).as_slice(), buf].concat(); +- let _ = self.wr.write_all(msg.as_slice()).await; ++ let _ = TcpMap::send_channel_message(0, buf.to_vec()).await; + } + + async fn recv(&mut self) -> io::Result> { + hdc::debug!("channel recv buf"); +- transfer::ChannelMap::recv().await ++ ChannelMap::recv().await ++ } ++ ++ async fn handle_task(&mut self) -> io::Result { ++ hdc::debug!("channel handle task"); ++ ChannelMap::handle_task().await + } + + async fn unity_task(&mut self) -> io::Result<()> { +@@ -206,9 +204,8 @@ impl Client { + self.loop_recv().await + } + +- + #[cfg(target_os = "windows")] +- async fn shell_task(&mut self) -> io::Result<()> { ++ async fn shell_task(&mut self) -> io::Result<()> { + let cmd = match self.params.len() { + 1 => "shell\0".to_string(), + _ => self.params.join(" "), +@@ -218,7 +215,7 @@ impl Client { + + let _handle = ylong_runtime::spawn(async move { + loop { +- match transfer::ChannelMap::recv().await { ++ match ChannelMap::recv().await { + Ok(recv) => { + let _ = utils::print_msg(recv).await; + } +@@ -266,7 +263,7 @@ impl Client { + + let _handle = ylong_runtime::spawn(async move { + loop { +- match transfer::ChannelMap::recv().await { ++ match ChannelMap::recv().await { + Ok(recv) => { + let _ = utils::print_msg(recv).await; + } +@@ -348,33 +345,24 @@ impl Client { + if self.command == HdcCommand::FileInit || self.command == HdcCommand::FileRecvInit { + let command_field_count = 2; + let current_dir = env::current_dir()?; +- let mut s = current_dir.display().to_string(); +- s.push(Base::get_path_sep()); +- params.insert(command_field_count, "-cwd".to_string()); +- params.insert(command_field_count + 1, s.clone()); ++ let s = format!("\"{}{}\"", current_dir.display(), Base::get_path_sep()); ++ params.insert(command_field_count, config::CMDSTR_REMOTE_PARAMETER.to_string()); ++ // it make all the file task to be remote task ++ params.insert(command_field_count + 1, "-cwd".to_string()); ++ params.insert(command_field_count + 2, s.clone()); + } +- + self.send(params.join(" ").as_bytes()).await; +- self.loop_recv().await ++ self.loop_task().await + } + + async fn loop_recv(&mut self) -> io::Result<()> { + loop { + let recv = self.recv().await; + match recv { +- Ok(recv) => { +- hdc::debug!( +- "recv: {:#?}", +- recv.iter() +- .map(|c| format!("{c:02x}")) +- .collect::>() +- .join(" ") +- ); +- match String::from_utf8(recv) { +- Ok(msg) => print!("{msg}"), +- Err(err) => return Err(Error::new(ErrorKind::Other, format!("recv data to str failed, {err}"))), +- } +- } ++ Ok(recv) => match String::from_utf8(recv) { ++ Ok(msg) => print!("{msg}"), ++ Err(err) => return Err(Error::new(ErrorKind::Other, format!("recv data to str failed, {err}"))), ++ }, + Err(e) => { + return Err(e); + } +@@ -382,6 +370,21 @@ impl Client { + } + } + ++ async fn loop_task(&mut self) -> io::Result<()> { ++ loop { ++ let task = self.handle_task(); ++ let handle = match task.await { ++ Ok(task) => { ++ ylong_runtime::spawn(session_file_task(task)) ++ } ++ Err(e) => { ++ return Err(e); ++ } ++ }; ++ let _ = handle.await?; ++ } ++ } ++ + async fn loop_recv_waitfor(&mut self) -> io::Result<()> { + loop { + let recv = self.recv().await; +@@ -433,10 +436,7 @@ impl Client { + Ok(recv) => { + hdc::debug!( + "app_install_task recv: {:#?}", +- recv.iter() +- .map(|c| format!("{c:02x}")) +- .collect::>() +- .join(" ") ++ recv.iter().map(|c| format!("{c:02x}")).collect::>().join(" ") + ); + match String::from_utf8(recv) { + Ok(msg) => print!("{}", msg), +@@ -586,3 +586,58 @@ fn recover_terminal(termios: libc::termios) -> io::Result<()> { + } + } + } ++ ++#[allow(unused)] ++async fn session_file_task(task_message: TaskMessage) -> io::Result<()> { ++ let session_id = 0; ++ match task_message.command { ++ HdcCommand::FileCheck | HdcCommand::FileInit => { ++ RemoteTaskMap::add(task_message.channel_id, true).await; ++ if !FileTaskMap::exsit(session_id, task_message.channel_id).await { ++ let mut task = HdcFile::new(session_id, task_message.channel_id); ++ task.transfer.server_or_daemon = true; ++ FileTaskMap::put(session_id, task_message.channel_id, task).await; ++ } ++ hdcfile::command_dispatch( ++ session_id, ++ task_message.channel_id, ++ task_message.command, ++ &task_message.payload, ++ task_message.payload.len() as u16, ++ ) ++ .await; ++ return Ok(()); ++ } ++ HdcCommand::FileBegin | HdcCommand::FileData | HdcCommand::FileFinish => { ++ hdcfile::command_dispatch( ++ session_id, ++ task_message.channel_id, ++ task_message.command, ++ &task_message.payload, ++ task_message.payload.len() as u16, ++ ) ++ .await; ++ return Ok(()); ++ } ++ HdcCommand::KernelEchoRaw => { ++ hdc::debug!("kernel echo raw"); ++ echo_print_message(task_message.payload).await; ++ return Ok(()); ++ } ++ _ => echo_print_message(task_message.payload).await?, ++ } ++ Ok(()) ++} ++ ++async fn echo_print_message(data: Vec) -> io::Result<()> { ++ match String::from_utf8(data) { ++ Ok(s) => { ++ println!("{}", s); ++ Ok(()) ++ } ++ Err(e) => { ++ println!("error: {}", e); ++ Err(io::Error::new(io::ErrorKind::Other, "transfer from utf8 erorr")) ++ } ++ } ++} +diff --git a/hdc_rust/src/host/server.rs b/hdc_rust/src/host/server.rs +index 2f9d2cad27ffeaef0b591b5dea304d9475e3dbbb..e1d0c54b9b50e0ae3434ce49309593425d3de9a4 100644 +--- a/hdc_rust/src/host/server.rs ++++ b/hdc_rust/src/host/server.rs +@@ -267,6 +267,21 @@ async fn handle_client(stream: TcpStream) -> io::Result<()> { + } else if !target_list.is_empty() { + set_target_status(TargetStatus::Ready); + } ++ if transfer::buffer::RemoteTaskMap::get(channel_id).await { ++ hdc::debug!("RemoteTaskMap::get(channel_id) is true"); ++ match transfer::tcp::recv_channel_cmd(&mut rd, channel_id).await { ++ Ok(task_message) => { ++ hdc::debug!("data transfer to task successful, task_message: {:#?}", task_message,); ++ let session_id = task::get_valid_session_id(connect_key.clone(), channel_id).await?; ++ transfer::put(session_id, task_message).await; ++ } ++ Err(err) => { ++ hdc::error!("data transfer to task failed, {err}"); ++ return Err(err); ++ } ++ } ++ continue; ++ } + let recv = match transfer::tcp::recv_channel_message(&mut rd).await { + Ok(recv) => recv, + Err(err) => { +@@ -299,7 +314,7 @@ async fn handle_client(stream: TcpStream) -> io::Result<()> { + parsed = parser::exchange_parsed_for_daemon(parsed); + + hdc::debug!("parsed cmd: {:#?}", parsed); +- ++ check_remote_parameter(&parsed, channel_id).await; + if let Some(cmd) = parsed.command { + if let Err(e) = task::channel_task_dispatch(task::TaskInfo { + command: cmd, +@@ -356,3 +371,15 @@ fn unpack_channel_handshake(recv: Vec) -> io::Result { + Err(Error::new(ErrorKind::Other, "unpack connect key failed")) + } + } ++ ++async fn check_remote_parameter(parsed: &parser::Parsed, channel_id: u32) { ++ for param in &parsed.parameters { ++ if param == config::CMDSTR_REMOTE_PARAMETER { ++ transfer::buffer::RemoteTaskMap::add(channel_id, false).await; ++ return; ++ } ++ if param == "-cwd" { ++ return; ++ } ++ } ++} +diff --git a/hdc_rust/src/host/task.rs b/hdc_rust/src/host/task.rs +index 10a1e8ead5eee49b26c728883acb9f13f6e399f6..73c5f0ec6c23c4797a481c3ab691a8037dd65fe9 100644 +--- a/hdc_rust/src/host/task.rs ++++ b/hdc_rust/src/host/task.rs +@@ -24,6 +24,7 @@ use hdc::common::hdcfile::{self, FileTaskMap, HdcFile}; + use hdc::config::{ConnectType, HdcCommand}; + use hdc::host_transfer::host_usb; + use hdc::transfer; ++use hdc::transfer::buffer::RemoteTaskMap; + use hdc::transfer::send_channel_data; + use hdc::utils; + #[allow(unused)] +@@ -36,7 +37,7 @@ use std::sync::Arc; + extern crate ylong_runtime_static as ylong_runtime; + use ylong_runtime::net::SplitReadHalf; + use ylong_runtime::net::TcpStream; +-use ylong_runtime::sync::{Mutex, RwLock, mpsc}; ++use ylong_runtime::sync::{mpsc, Mutex, RwLock}; + + use crate::host_app::HostAppTask; + +@@ -62,7 +63,7 @@ pub async fn channel_task_dispatch(task_info: TaskInfo) -> io::Result<()> { + HdcCommand::UnityReboot => { + send_to_daemon(task_info, HdcCommand::UnityReboot, 0, true).await?; + } +- | HdcCommand::UnityRemount => { ++ HdcCommand::UnityRemount => { + send_to_daemon(task_info, HdcCommand::UnityRemount, 2, false).await?; + } + HdcCommand::UnityExecute | HdcCommand::ShellInit | HdcCommand::ShellData => { +@@ -98,7 +99,11 @@ pub async fn channel_task_dispatch(task_info: TaskInfo) -> io::Result<()> { + channel_file_task(task_info).await?; + } + HdcCommand::FileRecvInit => { +- send_to_daemon(task_info, HdcCommand::FileInit, 2, false).await?; ++ let mut remote_param = 0; ++ if RemoteTaskMap::get(task_info.channel_id).await { ++ remote_param = 1; ++ } ++ send_to_daemon(task_info, HdcCommand::FileInit, 2 + remote_param, false).await?; + } + HdcCommand::UnityHilog => { + channel_hilog_task(task_info).await?; +@@ -190,6 +195,14 @@ async fn channel_forward_remove(task_info: TaskInfo, forward_or_reverse: bool) - + } + let forward_channel_id = forward::ForwardTaskMap::get_channel_id(session_id, task_string.clone()).await; + if let Some(_channel_id) = forward_channel_id { ++ let Some(task) = ForwardTaskMap::get(session_id, _channel_id).await else { ++ return Err(io::Error::new(io::ErrorKind::Other, "task is not exist")); ++ }; ++ let vec_none = Vec::::new(); ++ let task = &mut task.clone(); ++ let cid = task.context_forward.id; ++ forward::send_to_task( ++ session_id, _channel_id, HdcCommand::ForwardFreeContext, &vec_none, 0, cid).await; + forward::free_channel_task(session_id, _channel_id).await; + } + let message_str = format!("Remove forward ruler success, ruler:{}", task_string); +@@ -315,10 +328,19 @@ async fn channel_file_task(task_info: TaskInfo) -> io::Result<()> { + )); + } + } +- let _ = host_app::command_dispatch(session_id, task_info.channel_id, task_info.command, &payload) .await; ++ let _ = host_app::command_dispatch(session_id, task_info.channel_id, task_info.command, &payload).await; + } + + HdcCommand::FileCheck | HdcCommand::FileInit => { ++ if transfer::buffer::RemoteTaskMap::get(task_info.channel_id).await { ++ transfer::send_to_another( ++ task_info.channel_id, ++ TaskMessage { channel_id: task_info.channel_id, command: task_info.command, payload }, ++ ) ++ .await; ++ ++ return Ok(()); ++ } + if !FileTaskMap::exsit(session_id, task_info.channel_id).await { + let mut task = HdcFile::new(session_id, task_info.channel_id); + task.transfer.server_or_daemon = true; +@@ -605,11 +627,7 @@ async fn session_task_dispatch(task_message: TaskMessage, session_id: u32, conne + match level_result { + Ok(level) => { + if let Ok(str) = String::from_utf8(data) { +- if let Err(e) = transfer::send_channel_msg( +- task_message.channel_id, +- level, +- str, +- ).await { ++ if let Err(e) = transfer::send_channel_msg(task_message.channel_id, level, str).await { + hdc::error!("echo to client failed: {}", e.to_string()); + }; + } +@@ -623,6 +641,7 @@ async fn session_task_dispatch(task_message: TaskMessage, session_id: u32, conne + transfer::send_channel_data(task_message.channel_id, task_message.payload).await; + } + HdcCommand::KernelChannelClose => { ++ transfer::buffer::RemoteTaskMap::del(task_message.channel_id).await; + session_channel_close(task_message, session_id).await?; + } + HdcCommand::KernelHandshake => { +@@ -676,6 +695,16 @@ async fn session_forward_success(task_message: TaskMessage, session_id: u32) -> + } + + async fn session_file_task(task_message: TaskMessage, session_id: u32) -> io::Result<()> { ++ if transfer::buffer::RemoteTaskMap::get(task_message.channel_id).await { ++ // 有remote字段就不在server走file流程,直接发给client ++ hdc::debug!( ++ "channel {} has remote field, send to client, task_message: {:?}", ++ task_message.channel_id, ++ task_message ++ ); ++ transfer::send_to_another(task_message.channel_id, task_message).await; ++ return Ok(()); ++ } + match task_message.command { + HdcCommand::AppBegin | HdcCommand::AppFinish => { + let _ = host_app::command_dispatch( +@@ -719,46 +748,6 @@ async fn session_file_task(task_message: TaskMessage, session_id: u32) -> io::Re + hdc::info!("other tasks"); + } + } +- /* ActionType 未定义,临时屏蔽 +- let channel_id = task_message.channel_id; +- let command = task_message.command; +- +- let opt = admin_session(ActionType::Query(session_id)).await; +- if opt.is_none() { +- admin_session(ActionType::Add(HdcSession::new( +- session_id, +- String::from(""), +- NodeType::Server, +- ConnectType::Tcp, +- ))) +- .await; +- } +- let opt = admin_session(ActionType::Query(session_id)).await; +- +- let arc = opt.unwrap(); +- let mut session = arc.lock().await; +- if let std::collections::hash_map::Entry::Vacant(e) = session.map_tasks.entry(channel_id) { +- match command { +- HdcCommand::AppBegin => { +- let mut task = HostAppTask::new(session_id, channel_id); +- task.transfer.server_or_daemon = true; +- e.insert(Arc::new(Mutex::new(task))); +- } +- HdcCommand::FileInit => { +- let mut task = HdcFile::new(session_id, channel_id); +- task.transfer.server_or_daemon = true; +- e.insert(Arc::new(Mutex::new(task))); +- } +- _ => { +- hdc::info!("other tasks"); +- } +- } +- } +- let task = session.map_tasks.get(&channel_id).unwrap(); +- let task_ = &mut task.lock().await; +- let cmd = task_message.payload; +- let _ = task_.command_dispatch(command, &cmd[..], cmd.len() as u16); +- */ + Ok(()) + } + +@@ -884,7 +873,7 @@ impl ConnectMap { + }); + if guard.daemon_auth_status == DAEOMN_UNAUTHORIZED { + output.push("Unauthorized"); +- } else { ++ } else { + output.push(match guard.conn_status { + ConnectStatus::Connected => "Connected", + ConnectStatus::Ready => "Ready", +@@ -931,7 +920,7 @@ impl ConnectMap { + } + } + +-async fn get_valid_session_id(connect_key: String, channel_id: u32) -> io::Result { ++pub async fn get_valid_session_id(connect_key: String, channel_id: u32) -> io::Result { + match ConnectMap::get_session_id(connect_key).await { + Some(session_id) => Ok(session_id), + None => { +diff --git a/hdc_rust/src/lib.rs b/hdc_rust/src/lib.rs +index 5559b89d32999fb8ca4d9b52a0d35e9a2b70b227..660fcbe21027c92d5865858a411b7f4355c9657e 100644 +--- a/hdc_rust/src/lib.rs ++++ b/hdc_rust/src/lib.rs +@@ -20,6 +20,8 @@ pub mod common; + pub mod config; + #[cfg(not(feature = "host"))] + pub mod daemon_lib; ++#[cfg(not(feature = "host"))] ++pub mod daemon_transfer; + #[cfg(feature = "host")] + pub mod host_transfer; + pub mod serializer; +diff --git a/hdc_rust/src/transfer.rs b/hdc_rust/src/transfer.rs +index 0c3a60b063a5a04a43a8ca5ca97da0baf3d2f7b1..8648955f26b4d93591bccec54abf120cf60c5856 100644 +--- a/hdc_rust/src/transfer.rs ++++ b/hdc_rust/src/transfer.rs +@@ -19,18 +19,16 @@ pub mod buffer; + pub mod tcp; + pub mod uart; + pub mod uart_wrapper; +-pub mod usb; + pub use buffer::dump_session; + pub use buffer::put; + pub use buffer::send_channel_data; + pub use buffer::send_channel_msg; +-pub use buffer::usb_start_recv; ++pub use buffer::send_to_another; + pub use buffer::ChannelMap; ++pub use buffer::ConnectTypeMap; + pub use buffer::EchoLevel; + pub use buffer::TcpMap; + pub use buffer::UartMap; +-pub use buffer::UsbMap; +-pub use buffer::ConnectTypeMap; + pub use uart::uart_close; + pub use uart_wrapper::start_session; + pub use uart_wrapper::start_uart; +diff --git a/hdc_rust/src/transfer/base.rs b/hdc_rust/src/transfer/base.rs +index b26d97c89aa3b885cff4f0b89b5dd3b7c5818a58..5bec091891d7331e8751edcbcbdba46f0807784a 100644 +--- a/hdc_rust/src/transfer/base.rs ++++ b/hdc_rust/src/transfer/base.rs +@@ -37,11 +37,7 @@ pub struct CheckCompressVersion {} + impl CheckCompressVersion { + pub fn get_instance() -> BOOL_ { + static mut CAN_COMPRESS: Option = Option::None; +- unsafe { +- CAN_COMPRESS +- .get_or_insert_with(|| Arc::new(Mutex::new(false))) +- .clone() +- } ++ unsafe { CAN_COMPRESS.get_or_insert_with(|| Arc::new(Mutex::new(false))).clone() } + } + + pub async fn set(check_version: bool) { +@@ -72,9 +68,7 @@ pub trait Reader: Send + Sync + 'static { + } + + pub async fn unpack_task_message_lock( +- rd: &mut dyn Reader, +- pack_size: u32, +- tx: BoundedSender, ++ rd: &mut dyn Reader, pack_size: u32, tx: BoundedSender, + ) -> io::Result<()> { + let data = rd.read_frame(pack_size as usize)?; + let (head, body) = data.split_at(serializer::HEAD_SIZE); +@@ -84,8 +78,7 @@ pub async fn unpack_task_message_lock( + let expected_head_size = u16::from_be(payload_head.head_size) as usize; + let expected_data_size = u32::from_be(payload_head.data_size) as usize; + +- if serializer::HEAD_SIZE + expected_head_size + expected_data_size != pack_size as usize +- { ++ if serializer::HEAD_SIZE + expected_head_size + expected_data_size != pack_size as usize { + crate::warn!( + "protocol size diff: {pack_size} != {} + {expected_head_size} + {expected_data_size}", + serializer::HEAD_SIZE +@@ -111,13 +104,7 @@ pub async fn unpack_task_message_lock( + }; + let mut remaining = (expected_data_size - payload.len()) as i32; + if remaining == 0 { +- let _ = tx +- .send(TaskMessage { +- channel_id, +- command, +- payload: payload.to_vec(), +- }) +- .await; ++ let _ = tx.send(TaskMessage { channel_id, command, payload: payload.to_vec() }).await; + } + let mut total_payload = payload.to_vec(); + while remaining > 0 { +@@ -135,13 +122,7 @@ pub async fn unpack_task_message_lock( + remaining -= packet_size as i32; + crate::debug!("remaining:{}, packet_size:{}", remaining, packet_size); + if remaining == 0 { +- let _ = tx +- .send(TaskMessage { +- channel_id, +- command, +- payload: total_payload, +- }) +- .await; ++ let _ = tx.send(TaskMessage { channel_id, command, payload: total_payload }).await; + break; + } + } +@@ -151,82 +132,9 @@ pub async fn unpack_task_message_lock( + } + } + +- let _ = tx +- .send(TaskMessage { +- channel_id, +- command: HdcCommand::UartFinish, +- payload: vec![], +- }) +- .await; ++ let _ = tx.send(TaskMessage { channel_id, command: HdcCommand::UartFinish, payload: vec![] }).await; + Ok(()) + } +- Err(e) => { +- Err(utils::error_other(format!("uart unpack_task_message_lock, err:{:?}", e))) +- } +- } +-} +- +-pub fn unpack_task_message( +- rd: &mut dyn Reader, +- tx: BoundedSender<(TaskMessage, u32, u32)>, +-) -> io::Result<()> { +- let (pack_size, package_index, session_id) = rd.check_protocol_head()?; +- if pack_size == 0 { +- return Ok(()); ++ Err(e) => Err(utils::error_other(format!("uart unpack_task_message_lock, err:{:?}", e))), + } +- +- let data = rd.read_frame(pack_size as usize)?; +- ylong_runtime::spawn(async move { +- let (head, body) = data.split_at(serializer::HEAD_SIZE); +- let payload_head = serializer::unpack_payload_head(head.to_vec()); +- match payload_head { +- Ok(payload_head) => { +- let expected_head_size = u16::from_be(payload_head.head_size) as usize; +- let expected_data_size = u32::from_be(payload_head.data_size) as usize; +- +- if serializer::HEAD_SIZE + expected_head_size + expected_data_size != pack_size as usize { +- crate::warn!( +- "protocol size diff: {pack_size} != {} + {expected_head_size} + {expected_data_size}", +- serializer::HEAD_SIZE +- ); +- } +- +- if expected_head_size + expected_data_size == 0 +- || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE +- { +- return Err(Error::new(ErrorKind::Other, "Packet size incorrect")); +- } +- +- let (protect, payload) = body.split_at(expected_head_size); +- +- let payload_protect = serializer::unpack_payload_protect(protect.to_vec())?; +- let channel_id = payload_protect.channel_id; +- +- let command = match HdcCommand::try_from(payload_protect.command_flag) { +- Ok(command) => command, +- Err(_) => { +- return Err(Error::new(ErrorKind::Other, "unknown command")); +- } +- }; +- +- let _ = tx +- .send(( +- TaskMessage { +- channel_id, +- command, +- payload: payload.to_vec(), +- }, +- package_index, +- session_id, +- )) +- .await; +- Ok(()) +- } +- Err(e) => { +- Err(utils::error_other(format!("usb unpack_task_message, err:{:?}", e))) +- } +- } +- }); +- +- Ok(()) + } +diff --git a/hdc_rust/src/transfer/buffer.rs b/hdc_rust/src/transfer/buffer.rs +index 3c56454cc403cb1bd1c9f3a39ca01e89e2eecf91..c3ccc56f0af11d6363913740fc7c6af7a61f6974 100644 +--- a/hdc_rust/src/transfer/buffer.rs ++++ b/hdc_rust/src/transfer/buffer.rs +@@ -15,35 +15,34 @@ + //! buffer + #![allow(missing_docs)] + +-use super::base::{self, Writer}; ++use super::base::Writer; + use super::uart::UartWriter; +-use super::usb::{self, UsbReader, UsbWriter}; + use super::{tcp, uart_wrapper}; + #[cfg(feature = "emulator")] + use crate::daemon_lib::bridge::BridgeMap; ++#[cfg(not(feature = "host"))] ++use crate::daemon_transfer::usb::UsbMap; + #[cfg(feature = "host")] + use crate::host_transfer::host_usb::HostUsbMap; + + use crate::config::TaskMessage; + use crate::config::{self, ConnectType}; ++#[cfg(not(feature = "host"))] ++use crate::daemon_lib::task_manager; + use crate::serializer; + #[allow(unused)] + use crate::utils::hdc_log::*; +-#[cfg(not(feature = "host"))] +-use crate::daemon_lib::task_manager; + use std::collections::{HashMap, HashSet, VecDeque}; + use std::io::{self, Error, ErrorKind}; ++use std::mem::MaybeUninit; + use std::sync::Arc; + use std::sync::Once; +-use std::mem::MaybeUninit; +-use crate::transfer::usb::usb_write_all; +-use std::time::Duration; + + #[cfg(feature = "host")] + extern crate ylong_runtime_static as ylong_runtime; + use ylong_runtime::io::AsyncWriteExt; + use ylong_runtime::net::{SplitReadHalf, SplitWriteHalf}; +-use ylong_runtime::sync::{mpsc, Mutex, RwLock}; ++use ylong_runtime::sync::{Mutex, RwLock}; + + type ConnectTypeMap_ = Arc>>; + +@@ -51,11 +50,7 @@ pub struct ConnectTypeMap {} + impl ConnectTypeMap { + fn get_instance() -> ConnectTypeMap_ { + static mut CONNECT_TYPE_MAP: Option = None; +- unsafe { +- CONNECT_TYPE_MAP +- .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) +- .clone() +- } ++ unsafe { CONNECT_TYPE_MAP.get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))).clone() } + } + + pub async fn put(session_id: u32, conn_type: ConnectType) { +@@ -111,15 +106,13 @@ pub struct TcpMap { + map: Mutex>, + } + impl TcpMap { +- pub(crate) fn get_instance() -> &'static TcpMap { ++ pub(crate) fn get_instance() -> &'static TcpMap { + static mut TCP_MAP: MaybeUninit = MaybeUninit::uninit(); + static ONCE: Once = Once::new(); + + unsafe { + ONCE.call_once(|| { +- let global = TcpMap { +- map: Mutex::new(HashMap::new()) +- }; ++ let global = TcpMap { map: Mutex::new(HashMap::new()) }; + TCP_MAP = MaybeUninit::new(global); + }); + &*TCP_MAP.as_ptr() +@@ -141,16 +134,9 @@ impl TcpMap { + pub async fn send_channel_message(channel_id: u32, buf: Vec) -> io::Result<()> { + crate::trace!( + "send channel({channel_id}) msg: {:?}", +- buf.iter() +- .map(|&c| format!("{c:02X}")) +- .collect::>() +- .join(" ") ++ buf.iter().map(|&c| format!("{c:02X}")).collect::>().join(" ") + ); +- let send = [ +- u32::to_be_bytes(buf.len() as u32).as_slice(), +- buf.as_slice(), +- ] +- .concat(); ++ let send = [u32::to_be_bytes(buf.len() as u32).as_slice(), buf.as_slice()].concat(); + let instance = Self::get_instance(); + let map = instance.map.lock().await; + if let Some(guard) = map.get(&channel_id) { +@@ -182,123 +168,6 @@ impl TcpMap { + } + } + +-pub struct UsbMap { +- map: std::sync::Mutex>, +- lock: std::sync::Mutex, +-} +-impl UsbMap { +- pub(crate) fn get_instance() -> &'static UsbMap { +- static mut USB_MAP: MaybeUninit = MaybeUninit::uninit(); +- static ONCE: Once = Once::new(); +- +- unsafe { +- ONCE.call_once(|| { +- let global = UsbMap { +- map: std::sync::Mutex::new(HashMap::new()), +- lock: std::sync::Mutex::new(0) +- }; +- USB_MAP = MaybeUninit::new(global); +- }); +- &*USB_MAP.as_ptr() +- } +- } +- +- #[allow(unused)] +- async fn put(session_id: u32, data: TaskMessage) -> io::Result<()> { +- if DiedSession::get(session_id).await { +- return Err(Error::new(ErrorKind::NotFound, "session already died"));; +- } +- let mut fd = 0; +- { +- let instance = Self::get_instance(); +- let mut map = instance.map.lock().unwrap(); +- let Some(arc_wr) = map.get(&session_id) else { +- return Err(Error::new(ErrorKind::NotFound, "session not found")); +- }; +- fd =arc_wr.fd; +- } +- let body = serializer::concat_pack(data); +- let head = usb::build_header(session_id, 1, body.len()); +- let instance = Self::get_instance(); +- let _guard = instance.lock.lock().unwrap(); +- let mut child_ret = 0; +- match usb_write_all(fd, head) { +- Ok(_) => {} +- Err(e) => { +- return Err(Error::new(ErrorKind::Other, "Error writing head")); +- } +- } +- +- match usb_write_all(fd, body) { +- Ok(ret) => { +- child_ret = ret; +- } +- Err(e) => { +- return Err(Error::new(ErrorKind::Other, "Error writing body")); +- } +- } +- +- if ((child_ret % config::MAX_PACKET_SIZE_HISPEED) == 0) && (child_ret > 0) { +- let tail = usb::build_header(session_id, 0, 0); +- // win32 send ZLP will block winusb driver and LIBUSB_TRANSFER_ADD_ZERO_PACKET not effect +- // so, we send dummy packet to prevent zero packet generate +- match usb_write_all(fd, tail) { +- Ok(_) => {} +- Err(e) => { +- return Err(Error::new(ErrorKind::Other, "Error writing tail")); +- } +- } +- } +- Ok(()) +- } +- +- pub async fn start(session_id: u32, wr: UsbWriter) { +- let buffer_map = Self::get_instance(); +- let mut try_times = 0; +- let max_try_time = 10; +- let wait_one_seconds = 1000; +- loop { +- try_times += 1; +- if let Ok(mut map) = buffer_map.map.try_lock() { +- map.insert(session_id, wr); +- crate::error!("start usb session_id:{session_id} get lock success after try {try_times} times"); +- break; +- } else { +- if try_times > max_try_time { +- crate::error!("start usb session_id:{session_id} get lock failed will restart hdcd"); +- std::process::exit(0); +- } +- crate::error!("start usb session_id:{session_id} try lock failed {try_times} times"); +- std::thread::sleep(Duration::from_millis(wait_one_seconds)); +- } +- } +- ConnectTypeMap::put(session_id, ConnectType::Usb("some_mount_point".to_string())).await; +- } +- +- pub async fn end(session_id: u32) { +- let buffer_map = Self::get_instance(); +- let mut try_times = 0; +- let max_try_time = 10; +- let wait_ten_ms = 10; +- loop { +- try_times += 1; +- if let Ok(mut map) = buffer_map.map.try_lock() { +- let _ = map.remove(&session_id); +- crate::error!("end usb session_id:{session_id} get lock success after try {try_times} times"); +- break; +- } else { +- if try_times > max_try_time { +- crate::error!("end usb session_id:{session_id} get lock failed will force break"); +- break; +- } +- crate::warn!("end usb session_id:{session_id} get lock failed {try_times} times"); +- std::thread::sleep(Duration::from_millis(wait_ten_ms)); +- } +- } +- ConnectTypeMap::del(session_id).await; +- } +-} +- + type UartWriter_ = Arc>; + type UartMap_ = Arc>>; + +@@ -306,11 +175,7 @@ pub struct UartMap {} + impl UartMap { + fn get_instance() -> UartMap_ { + static mut UART_MAP: Option = None; +- unsafe { +- UART_MAP +- .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) +- .clone() +- } ++ unsafe { UART_MAP.get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))).clone() } + } + + #[allow(unused)] +@@ -343,7 +208,9 @@ pub async fn put(session_id: u32, data: TaskMessage) { + Some(ConnectType::Tcp) => { + TcpMap::put(session_id, data).await; + } +- Some(ConnectType::Usb(_mount_point)) => { ++ Some(ConnectType::Usb(_mount_point)) => ++ { ++ #[cfg(not(feature = "host"))] + if let Err(e) = UsbMap::put(session_id, data).await { + crate::error!("{e:?}"); + #[cfg(not(feature = "host"))] +@@ -358,7 +225,8 @@ pub async fn put(session_id: u32, data: TaskMessage) { + #[cfg(feature = "emulator")] + BridgeMap::put(session_id, data).await; + } +- Some(ConnectType::HostUsb(_mount_point)) => { ++ Some(ConnectType::HostUsb(_mount_point)) => ++ { + #[cfg(feature = "host")] + if let Err(e) = HostUsbMap::put(session_id, data).await { + crate::error!("{e:?}"); +@@ -387,7 +255,7 @@ impl EchoLevel { + 0 => Ok(Self::FAIL), + 1 => Ok(Self::INFO), + 2 => Ok(Self::OK), +- _ => Err(Error::new(ErrorKind::Other, "invalid message level type")) ++ _ => Err(Error::new(ErrorKind::Other, "invalid message level type")), + } + } + } +@@ -399,7 +267,13 @@ pub async fn send_channel_msg(channel_id: u32, level: EchoLevel, msg: String) -> + EchoLevel::RAW => msg.to_string() + "\r\n", + EchoLevel::OK => msg.to_string(), + }; +- TcpMap::send_channel_message(channel_id, data.as_bytes().to_vec()).await ++ if let Ok(is_client) = RemoteTaskMap::is_client(channel_id).await { ++ if is_client { ++ println!("{}", data); ++ return Ok(()); ++ } ++ } ++ TcpMap::send_channel_message(channel_id, data.into_bytes()).await + } + + // client recv and print msg +@@ -410,11 +284,7 @@ pub struct ChannelMap {} + impl ChannelMap { + fn get_instance() -> ChannelMap_ { + static mut TCP_RECVER: Option = None; +- unsafe { +- TCP_RECVER +- .get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))) +- .clone() +- } ++ unsafe { TCP_RECVER.get_or_insert_with(|| Arc::new(RwLock::new(HashMap::new()))).clone() } + } + + pub async fn start(rd: SplitReadHalf) { +@@ -433,20 +303,16 @@ impl ChannelMap { + let mut rd = arc_rd.lock().await; + tcp::recv_channel_message(&mut rd).await + } +-} + +-pub fn usb_start_recv(fd: i32, _session_id: u32) -> mpsc::BoundedReceiver<(TaskMessage, u32, u32)> { +- let (tx, rx) = mpsc::bounded_channel::<(TaskMessage, u32, u32)>(config::USB_QUEUE_LEN); +- ylong_runtime::spawn(async move { +- let mut rd = UsbReader { fd }; +- loop { +- if let Err(e) = base::unpack_task_message(&mut rd, tx.clone()) { +- crate::warn!("unpack task failed: {}, reopen fd...", e.to_string()); +- break; +- } +- } +- }); +- rx ++ pub async fn handle_task() -> io::Result { ++ let instance = Self::get_instance(); ++ let map = instance.read().await; ++ let Some(arc_rd) = map.get(&0) else { ++ return Err(Error::new(ErrorKind::NotFound, "channel not found")); ++ }; ++ let mut rd = arc_rd.lock().await; ++ tcp::recv_channel_cmd(&mut rd, 0u32).await ++ } + } + + pub struct DiedSession { +@@ -454,7 +320,7 @@ pub struct DiedSession { + queue: Arc>>, + } + impl DiedSession { +- pub(crate) fn get_instance() -> &'static DiedSession { ++ pub(crate) fn get_instance() -> &'static DiedSession { + static mut DIED_SESSION: MaybeUninit = MaybeUninit::uninit(); + static ONCE: Once = Once::new(); + +@@ -462,7 +328,7 @@ impl DiedSession { + ONCE.call_once(|| { + let global = DiedSession { + set: Arc::new(RwLock::new(HashSet::with_capacity(config::MAX_DIED_SESSION_NUM))), +- queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM))) ++ queue: Arc::new(RwLock::new(VecDeque::with_capacity(config::MAX_DIED_SESSION_NUM))), + }; + DIED_SESSION = MaybeUninit::new(global); + }); +@@ -475,7 +341,7 @@ impl DiedSession { + let mut set = instance.set.write().await; + let mut queue = instance.queue.write().await; + if queue.len() >= config::MAX_DIED_SESSION_NUM { +- if let Some(front_session) = queue.pop_front(){ ++ if let Some(front_session) = queue.pop_front() { + set.remove(&front_session); + } + } +@@ -490,4 +356,69 @@ impl DiedSession { + let set = instance.set.read().await; + set.contains(&session_id) + } +-} +\ No newline at end of file ++} ++ ++pub struct RemoteTaskMap { ++ map: Arc>>, ++} ++impl RemoteTaskMap { ++ pub(crate) fn get_instance() -> &'static RemoteTaskMap { ++ static mut REMOTE_TASK: MaybeUninit = MaybeUninit::uninit(); ++ static ONCE: Once = Once::new(); ++ ++ unsafe { ++ ONCE.call_once(|| { ++ let global = RemoteTaskMap { map: Arc::new(RwLock::new(HashMap::new())) }; ++ REMOTE_TASK = MaybeUninit::new(global); ++ }); ++ &*REMOTE_TASK.as_ptr() ++ } ++ } ++ ++ pub async fn add(channel_id: u32, is_client: bool) { ++ let instance = Self::get_instance(); ++ let mut map = instance.map.write().await; ++ crate::debug!("add channel_id:{}, is_client:{}", channel_id, is_client); ++ map.entry(channel_id).or_insert(is_client); ++ } ++ ++ pub async fn get(channel_id: u32) -> bool { ++ let instance = Self::get_instance(); ++ let map = instance.map.read().await; ++ map.contains_key(&channel_id) ++ } ++ ++ pub async fn is_client(channel_id: u32) -> io::Result { ++ let instance = Self::get_instance(); ++ let map = instance.map.read().await; ++ match map.get(&channel_id) { ++ Some(is_client) => Ok(*is_client), ++ None => Err(Error::new(ErrorKind::NotFound, "channel not found")), ++ } ++ } ++ ++ pub async fn del(channel_id: u32) { ++ crate::debug!("RemoveTaskMap del channel_id:{}", channel_id); ++ let instance = Self::get_instance(); ++ let mut map = instance.map.write().await; ++ map.remove(&channel_id); ++ } ++} ++ ++pub async fn send_to_another(session_id: u32, data: TaskMessage) { ++ #[cfg(feature = "host")] ++ match RemoteTaskMap::is_client(data.channel_id).await { ++ Ok(_) => { ++ send_channel_with_cmd(data).await; ++ return; ++ } ++ Err(_e) => {} ++ } ++ put(session_id, data).await; ++} ++ ++#[cfg(feature = "host")] ++pub async fn send_channel_with_cmd(data: TaskMessage) { ++ let msg = [u16::to_le_bytes(data.command as u16).as_slice(), data.payload.as_slice()].concat(); ++ let _ = TcpMap::send_channel_message(data.channel_id, msg).await; ++} +diff --git a/hdc_rust/src/transfer/tcp.rs b/hdc_rust/src/transfer/tcp.rs +index a3ba30dfb4d76b991ec43fd473ea75cf3c83b261..93684cff7ef73da57cc97460ca08f893a9c60797 100644 +--- a/hdc_rust/src/transfer/tcp.rs ++++ b/hdc_rust/src/transfer/tcp.rs +@@ -56,19 +56,14 @@ async fn read_frame(rd: &mut SplitReadHalf, expected_size: usize) -> io::Result< + pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result { + let data = read_frame(rd, serializer::HEAD_SIZE).await?; + let payload_head = serializer::unpack_payload_head(data)?; +- crate::trace!("get payload_head: {:?}", payload_head); +- + let expected_head_size = u16::from_be(payload_head.head_size) as usize; + let expected_data_size = u32::from_be(payload_head.data_size) as usize; +- if expected_head_size + expected_data_size == 0 +- || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE +- { ++ if expected_head_size + expected_data_size == 0 || expected_head_size + expected_data_size > HDC_BUF_MAX_SIZE { + return Err(Error::new(ErrorKind::Other, "Packet size incorrect")); + } + + let data = read_frame(rd, expected_head_size).await?; + let payload_protect = serializer::unpack_payload_protect(data)?; +- crate::trace!("get payload_protect: {:?}", payload_protect); + let channel_id = payload_protect.channel_id; + + let command = match HdcCommand::try_from(payload_protect.command_flag) { +@@ -79,21 +74,37 @@ pub async fn unpack_task_message(rd: &mut SplitReadHalf) -> io::Result io::Result> { + let data = read_frame(rd, 4).await?; + let Ok(data) = data.try_into() else { +- return Err(Error::new( +- ErrorKind::Other, +- "Data forced conversion failed", +- )); ++ return Err(Error::new(ErrorKind::Other, "Data forced conversion failed")); + }; + let expected_size = u32::from_be_bytes(data); + read_frame(rd, expected_size as usize).await + } ++ ++pub async fn recv_channel_cmd(rd: &mut SplitReadHalf, channel_id: u32) -> io::Result { ++ let data = read_frame(rd, 4).await?; ++ let Ok(data) = data.try_into() else { ++ return Err(Error::new(ErrorKind::Other, "Data forced conversion failed")); ++ }; ++ let expected_size = u32::from_be_bytes(data) as usize; ++ let command_data = read_frame(rd, 2).await?; ++ let payload = read_frame(rd, expected_size - 2).await?; ++ let Ok(command_data_array) = command_data.clone().try_into() else { ++ return Err(Error::new(ErrorKind::Other, "Data forced conversion failed")); ++ }; ++ let command_flag = u16::from_le_bytes(command_data_array); ++ let command = match HdcCommand::try_from(command_flag as u32) { ++ Ok(command) => command, ++ Err(_) => { ++ let mut payload_concat = command_data; ++ payload_concat.extend_from_slice(&payload); ++ return Ok(TaskMessage { channel_id, command: HdcCommand::Unknown, payload: payload_concat }); ++ } ++ }; ++ Ok(TaskMessage { channel_id, command, payload }) ++} +diff --git a/hdc_rust/src/transfer/usb.rs b/hdc_rust/src/transfer/usb.rs +deleted file mode 100644 +index 870cfe3f7177a425ff451e3995727b5d984af3ae..0000000000000000000000000000000000000000 +--- a/hdc_rust/src/transfer/usb.rs ++++ /dev/null +@@ -1,224 +0,0 @@ +-/* +- * Copyright (C) 2023 Huawei Device Co., Ltd. +- * Licensed under the Apache License, Version 2.0 (the "License"); +- * you may not use this file except in compliance with the License. +- * You may obtain a copy of the License at +- * +- * http://www.apache.org/licenses/LICENSE-2.0 +- * +- * Unless required by applicable law or agreed to in writing, software +- * distributed under the License is distributed on an "AS IS" BASIS, +- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- * See the License for the specific language governing permissions and +- * limitations under the License. +- */ +-//! usb +-#![allow(missing_docs)] +- +-use super::base; +- +-use crate::config; +-use crate::serializer; +-use crate::serializer::native_struct::UsbHead; +-use crate::serializer::pack_struct::UsbHeadPack; +-use crate::serializer::serialize::Serialization; +-use crate::serializer::serialize::SerializedBuffer; +-use crate::utils; +-#[allow(unused)] +-use crate::utils::hdc_log::*; +- +-#[cfg(not(target_os = "windows"))] +-use std::ffi::{CStr, CString}; +-use std::io::{self, Error, ErrorKind}; +-#[cfg(not(feature = "host"))] +-use libc::{fcntl, F_SETFD, FD_CLOEXEC}; +- +-#[repr(C)] +-pub struct PersistBuffer { +- pub ptr: *const libc::c_char, +- pub size: libc::c_ulonglong, +-} +- +-pub fn buf_to_vec(buf: PersistBuffer) -> Vec { +- let slice = +- unsafe { std::slice::from_raw_parts(buf.ptr as *const libc::c_uchar, buf.size as usize) }; +- slice.to_vec() +-} +- +-#[allow(unused)] +-extern "C" { +- fn access(_name: *const libc::c_char, _type: i32) -> i32; +- fn free(ptr: *const libc::c_void); +- +- fn ConfigEpPointEx(path: *const libc::c_char) -> i32; +- fn OpenEpPointEx(path: *const libc::c_char) -> i32; +- fn CloseUsbFdEx(fd: i32) -> i32; +- fn CloseEndPointEx(bulkIn: i32, bulkOut: i32, ctrlEp: i32, closeCtrlEp: u8); +- #[cfg(not(target_os = "windows"))] +- fn WriteUsbDevEx(bulkOut: i32, buf: SerializedBuffer) -> i32; +- #[cfg(not(target_os = "windows"))] +- fn ReadUsbDevEx(bulkIn: i32, buf: *mut u8, size: usize) -> usize; +- fn GetDevPathEx(path: *const libc::c_char) -> *const libc::c_char; +- +- fn SerializeUsbHead(value: *const UsbHeadPack) -> SerializedBuffer; +- fn ParseUsbHead(value: *mut UsbHeadPack, buf: SerializedBuffer) -> libc::c_uchar; +-} +- +-#[cfg(not(target_os = "windows"))] +-pub fn usb_init() -> io::Result<(i32, i32, i32)> { +- crate::info!("opening usb fd..."); +- let path = CString::new(config::USB_FFS_BASE).unwrap(); +- +- let base_path = unsafe { +- let p = GetDevPathEx(path.as_ptr()); +- let c_str = CStr::from_ptr(p); +- c_str.to_str().unwrap().to_string() +- }; +- // let c_str: &CStr = unsafe { CStr::from_ptr(p) }; +- // c_str.to_str().unwrap().to_string() +- // let base_path = serializer::ptr_to_string(unsafe { GetDevPathEx(path.as_ptr()) }); +- let ep0 = CString::new(base_path.clone() + "/ep0").unwrap(); +- let ep1 = CString::new(base_path.clone() + "/ep1").unwrap(); +- let ep2 = CString::new(base_path + "/ep2").unwrap(); +- if unsafe { access(ep0.as_ptr(), 0) } != 0 { +- return Err(utils::error_other("cannot access usb path".to_string())); +- } +- +- let config_fd = unsafe { ConfigEpPointEx(ep0.as_ptr()) }; +- if config_fd < 0 { +- return Err(utils::error_other("cannot open usb ep0".to_string())); +- } +- +- let bulkin_fd = unsafe { OpenEpPointEx(ep1.as_ptr()) }; +- if bulkin_fd < 0 { +- return Err(utils::error_other("cannot open usb ep1".to_string())); +- } +- +- let bulkout_fd = unsafe { OpenEpPointEx(ep2.as_ptr()) }; +- if bulkout_fd < 0 { +- return Err(utils::error_other("cannot open usb ep2".to_string())); +- } +- #[cfg(not(feature = "host"))] +- unsafe{ +- // cannot open with O_CLOEXEC, must fcntl +- fcntl(config_fd, F_SETFD, FD_CLOEXEC); +- fcntl(bulkin_fd, F_SETFD, FD_CLOEXEC); +- fcntl(bulkout_fd, F_SETFD, FD_CLOEXEC); +- } +- +- crate::info!("usb fd: {config_fd}, {bulkin_fd}, {bulkout_fd}"); +- +- Ok((config_fd, bulkin_fd, bulkout_fd)) +-} +- +-#[cfg(not(target_os = "windows"))] +-pub fn usb_close(config_fd: i32, bulkin_fd: i32, bulkout_fd: i32) { +- crate::info!("closing usb fd..."); +- unsafe { +- CloseUsbFdEx(config_fd); +- CloseUsbFdEx(bulkin_fd); +- CloseUsbFdEx(bulkout_fd); +- } +-} +- +-pub struct UsbReader { +- pub fd: i32, +-} +-pub struct UsbWriter { +- pub fd: i32, +-} +- +-impl base::Reader for UsbReader { +- // 屏蔽window编译报错 +- #[cfg(not(target_os = "windows"))] +- fn read_frame(&self, expect: usize) -> io::Result> { +- let mut buf :Vec = Vec::with_capacity(expect); +- unsafe { +- let readed = ReadUsbDevEx(self.fd, buf.as_mut_ptr() as *mut libc::uint8_t, expect); +- if readed != expect { +- Err( +- utils::error_other( +- format!( +- "usb read error, usb read failed: expect: {} acture: {}", +- expect, +- readed, +- ) +- ) +- ) +- } else { +- buf.set_len(readed); +- Ok(buf) +- } +- } +- } +- +- // 屏蔽window编译报错 +- #[cfg(target_os = "windows")] +- fn read_frame(&self, _expected_size: usize) -> io::Result> { +- Err(utils::error_other("usb read error".to_string())) +- } +- +- fn check_protocol_head(&mut self) -> io::Result<(u32, u32, u32)> { +- let buf = self.read_frame(serializer::USB_HEAD_SIZE)?; +- if buf[..config::USB_PACKET_FLAG.len()] != config::USB_PACKET_FLAG[..] { +- return Err(Error::new( +- ErrorKind::Other, +- format!("USB_PACKET_FLAG incorrect, content: {:#?}", buf), +- )); +- } +- let mut head = serializer::native_struct::UsbHead::default(); +- +- if let Err(e) = head.parse(buf) { +- crate::warn!("parse usb head error: {}", e.to_string()); +- return Err(e); +- } +- Ok((u32::from_be(head.data_size), 0, u32::to_be(head.session_id))) +- } +-} +- +-#[cfg(not(target_os = "windows"))] +-pub fn usb_write_all(fd: i32, data: Vec) -> io::Result { +- let buf = SerializedBuffer { +- ptr: data.as_ptr() as *const libc::c_char, +- size: data.len() as u64, +- }; +- let ret = unsafe { WriteUsbDevEx(fd, buf) } as i32; +- if ret < 0 { +- Err(utils::error_other("usb write failed".to_string())) +- } else { +- Ok(ret) +- } +-} +-impl base::Writer for UsbWriter { +- // 屏蔽window编译报错 +- #[cfg(not(target_os = "windows"))] +- #[allow(unused)] +- fn write_all(&self, data: Vec) -> io::Result { +- let buf = SerializedBuffer { +- ptr: data.as_ptr() as *const libc::c_char, +- size: data.len() as u64, +- }; +- let ret = unsafe { WriteUsbDevEx(self.fd, buf) } as i32; +- if ret < 0 { +- Err(utils::error_other("usb write failed".to_string())) +- } else { +- Ok(ret) +- } +- } +- +- // 屏蔽window编译报错 +- #[cfg(target_os = "windows")] +- fn write_all(&self, _data: Vec) -> io::Result { +- Ok(0) +- } +-} +- +-pub fn build_header(session_id: u32, option: u8, length: usize) -> Vec { +- UsbHead { +- session_id: u32::to_be(session_id), +- flag: [config::USB_PACKET_FLAG[0], config::USB_PACKET_FLAG[1]], +- option, +- data_size: u32::to_be(length as u32), +- } +- .serialize() +-} diff --git a/todo_list/file_remote_transfer/todo.md b/todo_list/file_remote_transfer/todo.md new file mode 100644 index 0000000000000000000000000000000000000000..1c0d2ad0f3e0a0f67d72dd06c2e0ee41aed62de7 --- /dev/null +++ b/todo_list/file_remote_transfer/todo.md @@ -0,0 +1,32 @@ + +[√]编译:携带了host目录调整的patch +[√]本地cargo rust版本验证 +[√]兼容性验证--cpp file send (client)命令字大小端处理异常 +[-]大于4G文件 +[-]跨平台文件 +[-]跨网段传输 +[-]原有local传输验证 +[√]性能评估 + +rust server性能较差 +rust server + rust client +hdc_rust -s 127.0.0.1:9710 file send D:\poreject\02_function_scripts\resource\asd /data/ +FileTransfer finish, Size:210535621, File count = 1, time:3724ms rate:56534.81kB/s + +rust server + cpp client +hdc -s 127.0.0.1:9710 file send D:\poreject\02_function_scripts\resource\asd /data/ +FileTransfer finish, Size:210535621, File count = 1, time:3730ms rate:56443.87kB/s + +cpp server + cpp client +hdc -s 127.0.0.1:8710 file send D:\poreject\02_function_scripts\resource\asd /data/ +FileTransfer finish, Size:210535621, File count = 1, time:3176ms rate:66289.55kB/s + +cpp server + rust client +hdc_rust -s 127.0.0.1:8710 file send D:\poreject\02_function_scripts\resource\asd /data/ +FileTransfer finish, Size:210535621, File count = 1, time:3127ms rate:67328.31kB/s + + +TODO +1、TcpMap 目前为混用channel_id和session_id版本,需要针对解耦版本重新适配 +2、针对断点续传预埋接口 +3、兼容方案