diff --git a/frameworks/native/sensor/BUILD.gn b/frameworks/native/sensor/BUILD.gn index 1ad00abcfaa20b671b98d2acec128fabd2a31c9c..df392a2cf16c4e24507bb2e0b9b38aa0ddf55a53 100755 --- a/frameworks/native/sensor/BUILD.gn +++ b/frameworks/native/sensor/BUILD.gn @@ -33,11 +33,17 @@ ohos_shared_library("libsensor_native") { "$SUBSYSTEM_DIR/utils/ipc/include", ] + defines = sensor_default_defines + deps = [ "$SUBSYSTEM_DIR/utils/common:libsensor_utils", "$SUBSYSTEM_DIR/utils/ipc:libsensor_ipc", ] + if (rust_socket_ipc) { + deps += [ "$SUBSYSTEM_DIR/rust/utils/socket_ipc_rust_ffi:sensor_rust_util_ffi" ] + } + external_deps = [ "c_utils:utils", "eventhandler:libeventhandler", diff --git a/frameworks/native/sensor/include/sensor_service_client.h b/frameworks/native/sensor/include/sensor_service_client.h index 0bcc85c8e34ebc9533d1aa0d42c2c3aa07f06943..e544c640c4c50f5b79dbd80ed085ed04389535a0 100755 --- a/frameworks/native/sensor/include/sensor_service_client.h +++ b/frameworks/native/sensor/include/sensor_service_client.h @@ -53,13 +53,13 @@ public: int32_t ResetSensors(); void ReceiveMessage(const char *buf, size_t size); void Disconnect(); + void HandleNetPacke(NetPacket &pkt); private: int32_t InitServiceClient(); void UpdateSensorInfoMap(int32_t sensorId, int64_t samplingPeriod, int64_t maxReportDelay); void DeleteSensorInfoItem(int32_t sensorId); int32_t CreateSocketChannel(); - void HandleNetPacke(NetPacket &pkt); std::mutex clientMutex_; sptr serviceDeathObserver_; sptr sensorServer_; diff --git a/frameworks/native/sensor/src/sensor_service_client.cpp b/frameworks/native/sensor/src/sensor_service_client.cpp index 3bf51d50a3155ddfbce516f0c70c0e91ebd08f76..20e73697e2e544e8a05d7a94eb67e4325c911653 100755 --- a/frameworks/native/sensor/src/sensor_service_client.cpp +++ b/frameworks/native/sensor/src/sensor_service_client.cpp @@ -27,6 +27,7 @@ #include "sensor_service_proxy.h" #include "sensors_errors.h" #include "system_ability_definition.h" +#include "rust_binding.h" namespace OHOS { namespace Sensors { @@ -36,6 +37,18 @@ namespace { constexpr HiLogLabel LABEL = { LOG_CORE, SENSOR_LOG_DOMAIN, "SensorServiceClient" }; constexpr int32_t GET_SERVICE_MAX_COUNT = 30; constexpr uint32_t WAIT_MS = 200; +#ifdef OHOS_BUILD_ENABLE_RUST +extern "C" { + void ReadClientPackets(RustStreamBuffer*, OHOS::Sensors::SensorServiceClient*, + void(*)(OHOS::Sensors::SensorServiceClient*, RustNetPacket*)); + void OnPacket(SensorServiceClient* object, RustNetPacket* cPkt) + { + NetPacket pkt(cPkt->msgId); + pkt.streamBufferPtr_.reset(cPkt->streamBuffer); + object->HandleNetPacke(pkt); + } +} +#endif // OHOS_BUILD_ENABLE_RUST } // namespace SensorServiceClient::~SensorServiceClient() @@ -357,7 +370,11 @@ void SensorServiceClient::ReceiveMessage(const char *buf, size_t size) if (!circBuf_.Write(buf, size)) { SEN_HILOGE("Write data failed. size:%{public}zu", size); } +#ifdef OHOS_BUILD_ENABLE_RUST + ReadClientPackets(circBuf_.streamBufferPtr_.get(), this, OnPacket); +#else OnReadPackets(circBuf_, std::bind(&SensorServiceClient::HandleNetPacke, this, std::placeholders::_1)); +#endif // OHOS_BUILD_ENABLE_RUST } void SensorServiceClient::HandleNetPacke(NetPacket &pkt) @@ -370,7 +387,11 @@ void SensorServiceClient::HandleNetPacke(NetPacket &pkt) SensorActiveInfo sensorActiveInfo; pkt >> sensorActiveInfo.pid >> sensorActiveInfo.sensorId >> sensorActiveInfo.samplingPeriodNs >> sensorActiveInfo.maxReportDelayNs; +#ifdef OHOS_BUILD_ENABLE_RUST + if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) { +#else if (pkt.ChkRWError()) { +#endif // OHOS_BUILD_ENABLE_RUST SEN_HILOGE("Packet read type failed"); return; } @@ -385,13 +406,14 @@ void SensorServiceClient::HandleNetPacke(NetPacket &pkt) void SensorServiceClient::Disconnect() { CALL_LOG_ENTER; - if (fd_ < 0) { + int32_t fd = GetFd(); + if (fd < 0) { return; } CHKPV(dataChannel_); - int32_t ret = dataChannel_->DelFdListener(fd_); + int32_t ret = dataChannel_->DelFdListener(fd); if (ret != ERR_OK) { - SEN_HILOGE("Delete fd listener failed, fd:%{public}d, ret:%{public}d", fd_, ret); + SEN_HILOGE("Delete fd listener failed, fd:%{public}d, ret:%{public}d", fd, ret); } Close(); } @@ -414,12 +436,16 @@ int32_t SensorServiceClient::CreateSocketChannel() SEN_HILOGE("Create socket channel failed, ret:%{public}d", ret); return ret; } +#ifdef OHOS_BUILD_ENABLE_RUST + StreamSocketSetFd(streamSocketPtr_.get(), clientFd); +#else fd_ = clientFd; - if (dataChannel_->AddFdListener(fd_, +#endif // OHOS_BUILD_ENABLE_RUST + if (dataChannel_->AddFdListener(GetFd(), std::bind(&SensorServiceClient::ReceiveMessage, this, std::placeholders::_1, std::placeholders::_2), std::bind(&SensorServiceClient::Disconnect, this)) != ERR_OK) { Close(); - SEN_HILOGE("Add fd listener failed, fd:%{public}d", fd_); + SEN_HILOGE("Add fd listener failed, fd:%{public}d", GetFd()); return ERROR; } StartTrace(HITRACE_TAG_SENSORS, "EnableActiveInfoCB"); diff --git a/rust/utils/socket_ipc_rust_ffi/BUILD.gn b/rust/utils/socket_ipc_rust_ffi/BUILD.gn new file mode 100644 index 0000000000000000000000000000000000000000..aa00862de0ae660dc164fa59599642a6eb568087 --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/BUILD.gn @@ -0,0 +1,31 @@ +# Copyright (C) 2022 Huawei Device Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import("//build/ohos.gni") + +ohos_rust_shared_ffi("sensor_rust_util_ffi") { + sources = [ "src/lib.rs" ] + + external_deps = [ + "c_utils:utils", + "hiviewdfx_hilog_native:hilog_rust", + "hiviewdfx_hilog_native:libhilog", + ] + + crate_name = "sensor_rust_util_ffi" + crate_type = "cdylib" + install_images = [ system_base_dir ] + + part_name = "sensor" + subsystem_name = "sensors" +} diff --git a/rust/utils/socket_ipc_rust_ffi/src/binding.rs b/rust/utils/socket_ipc_rust_ffi/src/binding.rs new file mode 100644 index 0000000000000000000000000000000000000000..4801fdb34787cf2e98a9fba10f2006c5c30d035d --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/binding.rs @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + // C interface for socket core object +use libc::c_void; +/// C StreamServer struct pointer +#[repr(C)] +pub struct CStreamServer { + _private: [u8; 0], +} +/// C Client struct pointer +#[repr(C)] +pub struct CSensorServiceClient { + _private: [u8; 0], +} + +extern "C" { + /// extern safe C function + pub fn memcpy_s(dest: *mut c_void, dest_size: libc::size_t, src: *const c_void, count: libc::size_t) -> i32; + /// extern safe C function + pub fn memset_s(dest: *mut c_void, dest_size: libc::size_t, ch: libc::c_int, count: libc::size_t) -> i32; +} + diff --git a/rust/utils/socket_ipc_rust_ffi/src/epoll_manager.rs b/rust/utils/socket_ipc_rust_ffi/src/epoll_manager.rs new file mode 100644 index 0000000000000000000000000000000000000000..096e27f6ca9b2b3b370c94ac783669ebea0d865f --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/epoll_manager.rs @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// provide C interface to C++ for calling +pub mod ffi; +use hilog_rust::{info, error, hilog, HiLogLabel, LogType}; +use std::{ptr, ffi::{CString, c_char}}; +use libc::c_int; +const ONCE_PROCESS_NETPACKET_LIMIT: i32 = 100; +const MAX_PACKET_BUF_SIZE: usize = 256; +const SEND_RETRY_SLEEP_TIME: u64 = 10000; +const SEND_RETRY_LIMIT: i32 = 32; +const RET_ERR: i32 = -1; + +const LOG_LABEL: HiLogLabel = HiLogLabel { + log_type: LogType::LogCore, + domain: 0xD002220, + tag: "EpollManager" +}; + +/// struct EpollManager +#[repr(C)] +pub struct EpollManager { + /// socket_fd + pub socket_fd: i32, + /// epoll_fd + pub epoll_fd: i32, +} + +impl Default for EpollManager { + fn default() -> Self { + Self { + socket_fd: -1, + epoll_fd: -1, + } + } +} + +impl EpollManager { + /// return const referance of self + /// + /// # Safety + /// + /// Makesure object is null pointer + unsafe fn as_ref<'a>(object: *const Self) -> Option<&'a Self>{ + object.as_ref() + } + /// return mutable referance of self + /// + /// # Safety + /// + /// Makesure object is null pointer + unsafe fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self>{ + object.as_mut() + } + /// get socket_fd + pub fn socket_fd(&self) -> i32 { + self.socket_fd + } + /// get epoll_fd + pub fn epoll_fd(&self) -> i32 { + self.epoll_fd + } + /// adjust whether epoll_fd is valid or not + pub fn is_valid_epoll(&self) -> bool { + self.epoll_fd > 0 + } + /// create epoll_fd + pub fn create_epoll_fd(&mut self, size: i32) { + let epoll_fd; + unsafe { + epoll_fd = libc::epoll_create(size); + } + if epoll_fd < 0 { + error!(LOG_LABEL, "epoll_create return {}", epoll_fd); + } else { + self.epoll_fd = epoll_fd; + info!(LOG_LABEL, "epoll_create, epoll_fd:{}", epoll_fd); + } + } + /// event is null pointer + pub fn epoll_ctl(fd: i32, op: i32, event: *mut libc::epoll_event, epoll_fd: i32) -> i32 { + if event.is_null() { + error!(LOG_LABEL, "event is nullptr"); + return RET_ERR; + } + if op == libc::EPOLL_CTL_DEL { + // safety: call unsafe function + unsafe { + libc::epoll_ctl(epoll_fd as c_int, op as c_int, fd as c_int, ptr::null_mut()) + } + } else { + // safety: call unsafe function + unsafe { + libc::epoll_ctl(epoll_fd as c_int, op as c_int, fd as c_int, event as *mut libc::epoll_event) + } + } + } + /// epoll_wait + /// + /// # Safety + /// + /// call libc + pub unsafe fn epoll_wait(events: *mut libc::epoll_event, maxevents: i32, timeout: i32, epoll_fd: i32) -> i32 { + let ret = libc::epoll_wait( + epoll_fd as c_int, events as *mut libc::epoll_event, maxevents as c_int, timeout as c_int); + if ret < 0 { + let errno = *libc::__errno_location(); + error!(LOG_LABEL, "epoll_wait ret:{},errno:{}", ret, errno); + } + ret + } + /// epoll close + pub fn epoll_close(&mut self) { + if self.epoll_fd >= 0 { + unsafe { + libc::close(self.epoll_fd as c_int); + } + self.epoll_fd = -1; + } + } + /// socket_set_fd + pub fn socket_set_fd(&mut self, fd: i32) { + self.socket_fd = fd + } + /// socket close + pub fn socket_close(&mut self) { + if self.socket_fd >= 0 { + unsafe { + let res = libc::close(self.socket_fd as c_int); + if res > 0 { + error!(LOG_LABEL, "Socket close failed res:{}", res); + } + } + self.socket_fd = -1; + } + } +} + + diff --git a/rust/utils/socket_ipc_rust_ffi/src/epoll_manager/ffi.rs b/rust/utils/socket_ipc_rust_ffi/src/epoll_manager/ffi.rs new file mode 100644 index 0000000000000000000000000000000000000000..241336452e42d2b6b6f23ae16f3d8b85b26c922c --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/epoll_manager/ffi.rs @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::*; +use hilog_rust::{info, error, hilog, HiLogLabel, LogType}; +use crate::{error::SocketStatusCode, epoll_manager::EpollManager}; +use std::mem::drop; +const LOG_LABEL: HiLogLabel = HiLogLabel { + log_type: LogType::LogCore, + domain: 0xD002220, + tag: "stream_socket_ffi" +}; +const RET_ERR: i32 = -1; + +/// create unique_ptr of stream_socket for C++ +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketCreate() -> *mut EpollManager { + let epoll_manager: Box:: = Box::default(); + Box::into_raw(epoll_manager) +} +/// drop unique_ptr of stream_socket for C++ +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketDelete(raw: *mut EpollManager) { + if !raw.is_null() { + drop(Box::from_raw(raw)); + } +} +/// Get Fd +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketGetFd(object: *const EpollManager) -> i32 { + info!(LOG_LABEL, "enter StreamSocketGetFd"); + if let Some(obj) = EpollManager::as_ref(object) { + obj.socket_fd() + } else { + SocketStatusCode::FdFail.into() + } +} + +/// Get Fd +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketGetEpollFd(object: *const EpollManager) -> i32 { + info!(LOG_LABEL, "enter StreamSocketGetEpollFd"); + if let Some(obj) = EpollManager::as_ref(object) { + obj.epoll_fd() + } else { + SocketStatusCode::EpollFdFail.into() + } +} + +/// Get Fd +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketEpollCreate(object: *mut EpollManager, size: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSocketEpollCreate"); + if let Some(obj) = EpollManager::as_mut(object) { + obj.create_epoll_fd(size); + obj.epoll_fd() + } else { + SocketStatusCode::EpollCreateFail.into() + } +} + +/// StreamSocketEpollCtl +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketEpollCtl(object: *const EpollManager, fd: i32, op: i32, + event: *mut libc::epoll_event, epoll_fd: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSocketEpollCtl"); + if let Some(obj) = EpollManager::as_ref(object) { + if fd < 0 { + error!(LOG_LABEL, "Invalid fd"); + return RET_ERR + } + let epoll_fd = + if epoll_fd < 0 { + if obj.is_valid_epoll(){ + obj.epoll_fd() + } else { + return RET_ERR; + } + } else { + epoll_fd + }; + EpollManager::epoll_ctl(fd, op, event, epoll_fd) + } else { + SocketStatusCode::EpollCtlFail.into() + } +} + +/// StreamSocketEpollWait +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketEpollWait( + object: *const EpollManager, events: *mut libc::epoll_event, maxevents: i32, timeout: i32, epoll_fd: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSocketEpollWait"); + if let Some(obj) = EpollManager::as_ref(object) { + let epoll_fd = + if epoll_fd < 0 { + if obj.is_valid_epoll() { + obj.epoll_fd() + } else { + return RET_ERR; + } + } else { + epoll_fd + }; + EpollManager::epoll_wait(events, maxevents, timeout, epoll_fd) + } else { + SocketStatusCode::EpollWaitFail.into() + } +} + +/// StreamSocketEpollClose +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketEpollClose(object: *mut EpollManager) -> i32 { + info!(LOG_LABEL, "enter StreamSocketEpollClose"); + if let Some(obj) = EpollManager::as_mut(object) { + obj.epoll_close(); + SocketStatusCode::Ok.into() + } else { + SocketStatusCode::EpollCloseFail.into() + } +} +/// StreamSocketClose +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketClose(object: *mut EpollManager) -> i32 { + info!(LOG_LABEL, "enter StreamSocketClose"); + if let Some(obj) = EpollManager::as_mut(object) { + obj.socket_close(); + SocketStatusCode::Ok.into() + } else { + SocketStatusCode::SocketCloseFail.into() + } +} +/// StreamSocketSetFd +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSocketSetFd(object: *mut EpollManager, fd: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSocketSetFd"); + if let Some(obj) = EpollManager::as_mut(object) { + obj.socket_set_fd(fd); + SocketStatusCode::Ok.into() + } else { + SocketStatusCode::SocketSetFdFail.into() + } +} + + diff --git a/rust/utils/socket_ipc_rust_ffi/src/error.rs b/rust/utils/socket_ipc_rust_ffi/src/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..7dbf117196625d1049cb97c43db7d6c73eaaded3 --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/error.rs @@ -0,0 +1,144 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub enum SocketStatusCode { + Ok = 0, + Fail = -1, + FdFail = -2, + EpollFdFail = -3, + EpollCreateFail = -4, + EpollCtlFail = -5, + EpollWaitFail = -6, + EpollCloseFail = -7, + SocketCloseFail = -8, + SocketSetFdFail = -9, +} + +pub enum BufferStatusCode { + Ok = 0, + Fail = -1, + ResetFail = -2, + CleanFail = -3, + UnreadSizeFail = -4, + IsEmptyFail = -5, + WriteStreamBufferFail = -6, + ReadStreamBufferFail = -7, + CheckRWErrorFail = -8, + CopyDataToBeginFail = -9, + ReadCharUsizeFail = -10, + ReadServerPacketsFail = -11, + ReadClientPacketsFail = -12, + SizeFail = -13, + RcountFail = -14, + WcountFail = -15, + WposFail = -16, + RposFail = -17, + SetRwErrStatusFail = -18, + SetRposFail = -19, +} + +pub enum SessionStatusCode { + Ok = 0, + Fail = -1, + UidFail = -2, + PidFail = -3, + ModuleTypeFail = -4, + FdFail = -5, + SetTokenTypeFail = -6, + TokenTypeFail = -7, + CloseFail = -8, + SetUidFail = -9, + SetFdFail = -10, + SetPidFail = -11, +} + +pub enum NetPacketStatusCode { + Ok = 0, + Fail = -1, + PacketLengthFail = -2, +} + +impl From for i32 { + fn from(code: SocketStatusCode) -> i32 { + match code { + SocketStatusCode::Ok => 0, + SocketStatusCode::FdFail => -2, + SocketStatusCode::EpollFdFail => -3, + SocketStatusCode::EpollCreateFail => -4, + SocketStatusCode::EpollCtlFail => -5, + SocketStatusCode::EpollWaitFail => -6, + SocketStatusCode::EpollCloseFail => -7, + SocketStatusCode::SocketCloseFail => -8, + SocketStatusCode::SocketSetFdFail => -9, + _ => -1, + } + } +} + +impl From for i32 { + fn from(code: BufferStatusCode) -> i32 { + match code { + BufferStatusCode::Ok => 0, + BufferStatusCode::ResetFail => -2, + BufferStatusCode::CleanFail => -3, + BufferStatusCode::UnreadSizeFail => -4, + BufferStatusCode::IsEmptyFail => -5, + BufferStatusCode::WriteStreamBufferFail => -6, + BufferStatusCode::ReadStreamBufferFail => -7, + BufferStatusCode::CheckRWErrorFail => -8, + BufferStatusCode::CopyDataToBeginFail => -9, + BufferStatusCode::ReadCharUsizeFail => -10, + BufferStatusCode::ReadServerPacketsFail => -11, + BufferStatusCode::ReadClientPacketsFail => -12, + BufferStatusCode::SizeFail => -13, + BufferStatusCode::RcountFail => -14, + BufferStatusCode::WcountFail => -15, + BufferStatusCode::WposFail => -16, + BufferStatusCode::RposFail => -17, + BufferStatusCode::SetRwErrStatusFail => -18, + BufferStatusCode::SetRposFail => -19, + _ => -1, + } + } +} + +impl From for i32 { + fn from(code: SessionStatusCode) -> i32 { + match code { + SessionStatusCode::Ok => 0, + SessionStatusCode::UidFail => -2, + SessionStatusCode::PidFail => -3, + SessionStatusCode::ModuleTypeFail => -4, + SessionStatusCode::FdFail => -5, + SessionStatusCode::SetTokenTypeFail => -6, + SessionStatusCode::TokenTypeFail => -7, + SessionStatusCode::CloseFail => -8, + SessionStatusCode::SetUidFail => -9, + SessionStatusCode::SetFdFail => -10, + SessionStatusCode::SetPidFail => -11, + _ => -1, + } + } +} + +impl From for i32 { + fn from(code: NetPacketStatusCode) -> i32 { + match code { + NetPacketStatusCode::Ok => 0, + NetPacketStatusCode::PacketLengthFail => -2, + _ => -1, + } + } +} \ No newline at end of file diff --git a/rust/utils/socket_ipc_rust_ffi/src/lib.rs b/rust/utils/socket_ipc_rust_ffi/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..1663472713c401b3fee252ecdbbb70daef26e0f6 --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/lib.rs @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Safe Rust interface to OHOS msdp +#![feature(rustc_private)] +#![allow(dead_code)] + +extern crate libc; +mod epoll_manager; +mod stream_buffer; +mod stream_session; +mod net_packet; +mod binding; +mod error; +/// annotation +pub type Result = std::result::Result; + diff --git a/rust/utils/socket_ipc_rust_ffi/src/net_packet.rs b/rust/utils/socket_ipc_rust_ffi/src/net_packet.rs new file mode 100644 index 0000000000000000000000000000000000000000..7e23a996f6761621cd179f63835f1af93d53c63f --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/net_packet.rs @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::ffi::{CString, c_char}; +use hilog_rust::{hilog, error, HiLogLabel, LogType}; +use std::mem::size_of; +use crate::stream_buffer::StreamBuffer; +const STREAM_BUF_WRITE_FAIL: i32 = 2; +const LOG_LABEL: HiLogLabel = HiLogLabel { + log_type: LogType::LogCore, + domain: 0xD002220, + tag: "NetPacket" +}; + +/// struct PackHead +#[repr(packed(1))] +#[repr(C)] +pub struct PackHead { + /// NetPacket type + pub id_msg: MessageId, + /// SufferBuffer size + pub size: usize, +} + +/// NetPacket type +#[derive(Copy, Clone)] +#[repr(C)] +pub enum MessageId { + /// + Invalid = 0, + /// + Device, + /// + DeviceIds, + /// + DeviceSupportKeys, + /// + AddDeviceListener, + /// + DeviceKeyboardType, + /// + DisplayInfo, + /// + NoticeAnr, + /// + MarkProcess, + /// + OnSubscribeKey, + /// + OnKeyEvent, + /// + OnPointerEvent, + /// + ReportKeyEvent, + /// + ReportPointerEvent, + /// + OnDeviceAdded, + /// + OnDeviceRemoved, + /// + CoordinationAddListener, + /// + CoordinationMessage, + /// + CoordinationGetState, + + /// + DragNotifyResult, + /// + DragStateListener, +} + +/// struct NetPacket +#[derive(Copy, Clone)] +#[repr(C)] +pub struct NetPacket { + /// NetPacket head + pub msg_id: MessageId, + /// NetPacket stream_buffer + pub stream_buffer: StreamBuffer, +} + +/// struct CNetPacket +#[derive(Copy, Clone)] +#[repr(C)] +pub struct CNetPacket { + /// NetPacket head + pub msg_id: MessageId, + /// NetPacket stream_buffer_ptr + pub stream_buffer_ptr: *const StreamBuffer, +} + +impl Default for NetPacket { + fn default() -> Self { + Self { + msg_id: MessageId::Invalid, + stream_buffer: Default::default(), + } + } +} + +impl NetPacket { + /// get refenrance from pointer + /// + ///# Safety + /// + /// object pointer is valid + pub unsafe fn as_ref<'a>(object: *const Self) -> Option<&'a Self>{ + object.as_ref() + } + /// get mut refenrance from pointer + /// + ///# Safety + /// + /// object pointer is valid + pub unsafe fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self>{ + object.as_mut() + } + /// write + pub fn write(&mut self, data: T) { + let data: *const c_char = &data as *const T as *const c_char; + let size = size_of::(); + self.stream_buffer.write_char_usize(data, size); + } + /// read + pub fn read(&mut self, data: &mut T) { + let data: *mut c_char = data as *mut T as *mut c_char; + let size = size_of::(); + self.stream_buffer.read_char_usize(data, size); + } + /// get_size + pub fn size(&self) -> usize { + self.stream_buffer.size() + } + /// get_packet_length + pub fn get_packet_length(&self) -> usize { + size_of::() + self.stream_buffer.w_pos + } + /// get_data + pub fn get_data(&self) -> *const c_char { + self.stream_buffer.data() + } + /// get_msg_id + pub fn get_msg_id(&self) -> MessageId { + self.msg_id + } + /// make_data + pub fn make_data(&self, buf: &mut StreamBuffer) { + let head = PackHead { + id_msg: self.msg_id, + size: self.stream_buffer.w_pos, + }; + buf.write(head); + if self.stream_buffer.w_pos > 0 && !buf.write_char_usize(&self.stream_buffer.sz_buff[0] as *const c_char, + self.stream_buffer.w_pos) { + error!(LOG_LABEL, "Write data to stream failed, errCode:{}", STREAM_BUF_WRITE_FAIL); + } + } +} + diff --git a/rust/utils/socket_ipc_rust_ffi/src/stream_buffer.rs b/rust/utils/socket_ipc_rust_ffi/src/stream_buffer.rs new file mode 100644 index 0000000000000000000000000000000000000000..f93dfa4b6574e70c9502c3fd29f2e9e8db655340 --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/stream_buffer.rs @@ -0,0 +1,390 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// provide C interface to C++ for calling +pub mod ffi; +use hilog_rust::{info, error, hilog, debug, HiLogLabel, LogType}; +use std::ffi::{CString, c_char}; +use crate::binding; +use std::mem::size_of; +use crate::binding::CSensorServiceClient; +use crate::net_packet::{NetPacket, CNetPacket}; +use crate::net_packet::PackHead; +type ErrorStatus = crate::stream_buffer::ErrStatus; +/// function pointer alias +pub type ClientPacketCallBackFun = unsafe extern "C" fn ( + client: *const CSensorServiceClient, + pkt: *const CNetPacket, +); + +const ONCE_PROCESS_NETPACKET_LIMIT: i32 = 100; +const MAX_STREAM_BUF_SIZE: usize = 256; +/// max buffer size of packet +pub const MAX_PACKET_BUF_SIZE: usize = 256; +const PARAM_INPUT_INVALID: i32 = 5; +const MEM_OUT_OF_BOUNDS: i32 = 3; +const MEMCPY_SEC_FUN_FAIL: i32 = 4; +const STREAM_BUF_READ_FAIL: i32 = 1; +const MAX_VECTOR_SIZE: i32 = 10; +const LOG_LABEL: HiLogLabel = HiLogLabel { + log_type: LogType::LogCore, + domain: 0xD002220, + tag: "StreamBuffer" +}; + +/// enum errstatus +#[derive(Copy, Clone, PartialEq)] +#[repr(C)] +pub enum ErrStatus { + /// status ok + Ok = 0, + /// readerror + Read = 1, + /// writeerror + Write = 2, +} + +/// struct streambuffer +#[derive(Copy, Clone)] +#[repr(C)] +pub struct StreamBuffer { + /// error status of read or write + pub rw_error_status: ErrorStatus, + /// read count + pub r_count: usize, + /// write count + pub w_count: usize, + /// read position + pub r_pos: usize, + /// write position + pub w_pos: usize, + /// buffer of read or write + pub sz_buff: [c_char; MAX_STREAM_BUF_SIZE + 1], +} + +impl Default for StreamBuffer { + fn default() -> Self { + Self { + rw_error_status: ErrorStatus::Ok, + r_count: 0, + w_count: 0, + r_pos: 0, + w_pos: 0, + sz_buff: [0; MAX_STREAM_BUF_SIZE + 1], + } + } +} + + +impl StreamBuffer { + /// return const referance of self + /// + /// # Safety + /// + /// Makesure object is null pointer + unsafe fn as_ref<'a>(object: *const Self) -> Option<&'a Self>{ + object.as_ref() + } + /// return mutable referance of self + /// + /// # Safety + /// + /// Makesure object is null pointer + unsafe fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self>{ + object.as_mut() + } + + /// write + pub fn write(&mut self, data: T) { + let data: *const c_char = &data as *const T as *const c_char; + let size = size_of::(); + self.write_char_usize(data, size); + } + + fn reset(&mut self) { + self.r_pos = 0; + self.w_pos = 0; + self.r_count = 0; + self.w_count = 0; + self.rw_error_status = ErrorStatus::Ok; + } + + fn clean(&mut self) { + self.reset(); + let size = MAX_STREAM_BUF_SIZE + 1; + let reference = &(self.sz_buff); + let pointer = reference as *const c_char; + unsafe { + let ret = binding::memset_s(pointer as *mut libc::c_void, size, 0, size); + if ret != 0 { + error!(LOG_LABEL, "Call memset_s fail"); + } + } + } + + fn seek_read_pos(&mut self, n: usize) -> bool { + let pos: usize = self.r_pos + n; + if pos > self.w_pos { + error!(LOG_LABEL, "The position in the calculation is not as expected. pos:{} [0, {}]", + pos, self.w_pos); + return false; + } + self.r_pos = pos; + true + } + /// write buffer + pub fn write_buf(&self) -> *const c_char { + info!(LOG_LABEL, "enter write_buf"); + &self.sz_buff[self.w_pos] as *const c_char + } + /// unread size + pub fn unread_size(&self) -> usize { + if self.w_pos <= self.r_pos { + 0 + } else { + self.w_pos - self.r_pos + } + } + + fn is_empty(&self) -> bool { + self.r_pos == self.w_pos + } + + fn write_streambuffer(&mut self, buf: &Self) -> bool { + self.write_char_usize(buf.data(), buf.size()) + } + fn read_streambuffer(&self, buf: &mut Self) -> bool { + buf.write_char_usize(self.data(), self.size()) + } + /// data function + pub fn data(&self) -> *const c_char { + &(self.sz_buff[0]) as *const c_char + } + /// size function + pub fn size(&self) -> usize { + self.w_pos + } + /// check error status of read or write + pub fn chk_rwerror(&self) -> bool { + self.rw_error_status != ErrorStatus::Ok + } + fn get_available_buf_size(&self) -> usize { + if self.w_pos >= MAX_STREAM_BUF_SIZE { + 0 + } else { + MAX_STREAM_BUF_SIZE - self.w_pos + } + } + fn get_error_status_remark(&self) -> *const c_char { + let s = match self.rw_error_status { + ErrorStatus::Ok => "OK\0", + ErrorStatus::Read => "READ_ERROR\0", + ErrorStatus::Write => "WRITE_ERROR\0", + }; + error!(LOG_LABEL, "rw_error_status={}", s); + s.as_ptr() + } + fn read_buf(&self) -> *const c_char { + &(self.sz_buff[self.r_pos]) as *const c_char + } + /// write buffer + pub fn write_char_usize(&mut self, buf: *const c_char, size: usize) -> bool { + if self.chk_rwerror() { + return false; + } + if buf.is_null() { + error!(LOG_LABEL, "Invalid input parameter buf=nullptr errCode:{}", PARAM_INPUT_INVALID); + self.rw_error_status = ErrorStatus::Write; + return false; + } + if size == 0 { + error!(LOG_LABEL, "Invalid input parameter size={} errCode:{}", size, PARAM_INPUT_INVALID); + self.rw_error_status = ErrorStatus::Write; + return false; + } + if (self.w_pos + size) > MAX_STREAM_BUF_SIZE { + error!(LOG_LABEL, "The write length exceeds buffer. wIdx:{} size:{} maxBufSize:{} errCode:{}", + self.w_pos, size, MAX_STREAM_BUF_SIZE, MEM_OUT_OF_BOUNDS); + self.rw_error_status = ErrorStatus::Write; + return false; + } + unsafe { + let pointer = &(self.sz_buff[0]) as *const c_char; + let ret = binding::memcpy_s(pointer.add(self.w_pos) as *mut libc::c_void, + self.get_available_buf_size(), buf as *mut libc::c_void, size); + if ret != 0 { + error!(LOG_LABEL, "Failed to call memcpy_s. ret:{}", ret); + self.rw_error_status = ErrorStatus::Write; + return false; + } + } + self.w_pos += size; + self.w_count += 1; + true + } + fn check_write(&mut self, size: usize) -> bool { + let buffer_size = size; + let mut avail_size = self.get_available_buf_size(); + if buffer_size > avail_size && self.r_pos > 0 { + self.copy_data_to_begin(); + avail_size = self.get_available_buf_size(); + } + avail_size >= buffer_size + } + fn copy_data_to_begin(&mut self) { + let unread_size = self.unread_size(); + if unread_size > 0 && self.r_pos > 0 { + for (index, value) in (self.r_pos..=self.w_pos).enumerate() { + self.sz_buff[index] = self.sz_buff[value]; + } + } + debug!(LOG_LABEL, "unread_size:{} rPos:{} wPos:{}", unread_size, self.r_pos, self.w_pos); + self.r_pos = 0; + self.w_pos = unread_size; + } + /// read buffer + pub fn read_char_usize(&mut self, buf: *const c_char, size: usize) -> bool { + if self.chk_rwerror() { + return false; + } + if buf.is_null() { + error!(LOG_LABEL, "Invalid input parameter buf=nullptr errCode:{}", PARAM_INPUT_INVALID); + self.rw_error_status = ErrorStatus::Read; + return false; + } + if size == 0 { + error!(LOG_LABEL, "Invalid input parameter size={} errCode:{}", size, PARAM_INPUT_INVALID); + self.rw_error_status = ErrorStatus::Read; + return false; + } + if (self.r_pos + size) > self.w_pos { + error!(LOG_LABEL, "Memory out of bounds on read... errCode:{}", MEM_OUT_OF_BOUNDS); + self.rw_error_status = ErrorStatus::Read; + return false; + } + unsafe { + let ret = binding::memcpy_s(buf as *mut libc::c_void, size, self.read_buf() as *const libc::c_void, size); + if ret != 0 { + error!(LOG_LABEL, "Failed to call memcpy_s. ret:{}", ret); + self.rw_error_status = ErrorStatus::Read; + return false; + } + } + self.r_pos += size; + self.r_count += 1; + true + } + /// circle write + pub fn circle_write(&mut self, buf: *const c_char, size: usize) -> bool { + if !self.check_write(size) { + error!(LOG_LABEL, "Out of buffer memory, availableSize:{}, size:{}, unreadSize:{}, rPos:{}, wPos:{}", + self.get_available_buf_size(), size, self.unread_size(), + self.r_pos, self.w_pos); + return false; + } + self.write_char_usize(buf, size) + } + /// callback of client + /// + ///# Safety + /// + /// call unsafe function + pub unsafe fn read_client_packets(&mut self, client: *const CSensorServiceClient, callback_fun: ClientPacketCallBackFun) { + const HEAD_SIZE: usize = size_of::(); + for _i in 0..ONCE_PROCESS_NETPACKET_LIMIT { + let unread_size = self.unread_size(); + if unread_size < HEAD_SIZE { + break; + } + let data_size = unread_size - HEAD_SIZE; + let buf: *const c_char = self.read_buf(); + if buf.is_null() { + error!(LOG_LABEL, "buf is null, skip then break"); + break; + } + let head: *const PackHead = buf as *const PackHead; + if head.is_null() { + error!(LOG_LABEL, "head is null, skip then break"); + break; + } + let size; + let id_msg; + unsafe { + size = (*head).size; + id_msg = (*head).id_msg; + } + if !(0..=MAX_PACKET_BUF_SIZE).contains(&size) { + error!(LOG_LABEL, "Packet header parsing error, and this error cannot be recovered. \ + The buffer will be reset. size:{}, unreadSize:{}", size, unread_size); + self.reset(); + break; + } + if size > data_size { + break; + } + let mut pkt: NetPacket = NetPacket { + msg_id: id_msg, + ..Default::default() + }; + unsafe { + if size > 0 && + !pkt.stream_buffer.write_char_usize(buf.add(HEAD_SIZE) as *const c_char, size) { + error!(LOG_LABEL, "Error writing data in the NetPacket. It will be retried next time. \ + messageid:{}, size:{}", id_msg as i32, size); + break; + } + } + if !self.seek_read_pos(pkt.get_packet_length()) { + error!(LOG_LABEL, "Set read position error, and this error cannot be recovered, and the buffer \ + will be reset. packetSize:{} unreadSize:{}", pkt.get_packet_length(), unread_size); + self.reset(); + break; + } + let c_net_packet: CNetPacket = CNetPacket { + msg_id: pkt.msg_id, + stream_buffer_ptr: Box::into_raw(Box::new(pkt.stream_buffer)) + }; + unsafe { + callback_fun(client, &c_net_packet as *const CNetPacket); + } + if self.is_empty() { + self.reset(); + break; + } + } + } + fn r_count(&self) -> usize { + self.r_count + } + fn w_count(&self) -> usize { + self.w_count + } + fn w_pos(&self) -> usize { + self.w_pos + } + fn r_pos(&self) -> usize { + self.r_pos + } + fn sz_buff(&self) -> *const c_char { + &self.sz_buff[0] as *const c_char + } + fn set_rw_error_status(&mut self, rw_error_status: ErrorStatus) { + self.rw_error_status = rw_error_status + } + fn set_r_pos(&mut self, r_pos: usize) { + self.r_pos = r_pos + } +} + diff --git a/rust/utils/socket_ipc_rust_ffi/src/stream_buffer/ffi.rs b/rust/utils/socket_ipc_rust_ffi/src/stream_buffer/ffi.rs new file mode 100644 index 0000000000000000000000000000000000000000..2311d1b28f04055622ad5072567c45c40a2a8f2e --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/stream_buffer/ffi.rs @@ -0,0 +1,358 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::*; +use hilog_rust::{info, hilog, HiLogLabel, LogType}; +use crate::error::BufferStatusCode; +const LOG_LABEL: HiLogLabel = HiLogLabel { + log_type: LogType::LogCore, + domain: 0xD002220, + tag: "stream_buffer_ffi" +}; +/// create unique_ptr of stream_buffer for C++ +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferCreate() -> *mut StreamBuffer { + let stream_buffer: Box:: = Box::default(); + Box::into_raw(stream_buffer) +} +/// drop unique_ptr of stream_buffer for C++ +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferDelete(raw: *mut StreamBuffer) { + if !raw.is_null() { + drop(Box::from_raw(raw)); + } +} +/// data +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferData(object: *const StreamBuffer) -> *const c_char { + info!(LOG_LABEL, "enter data"); + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.data() + } else { + std::ptr::null() + } +} +/// size +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferSize(object: *const StreamBuffer) -> usize { + info!(LOG_LABEL, "enter size"); + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.size() + } else { + 0 + } +} +/// StreamBufferReset +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferReset(object: *mut StreamBuffer) -> i32 { + info!(LOG_LABEL, "enter StreamBufferReset"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.reset(); + BufferStatusCode::Ok.into() + } else { + BufferStatusCode::ResetFail.into() + } +} +/// StreamBufferClean +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferClean(object: *mut StreamBuffer) -> i32 { + info!(LOG_LABEL, "enter clean"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.clean(); + BufferStatusCode::Ok.into() + } else { + BufferStatusCode::CleanFail.into() + } +} +/// StreamBufferWrite +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferWrite(object: *mut StreamBuffer, buf: *const StreamBuffer) -> bool { + info!(LOG_LABEL, "enter StreamBufferWrite"); + if let Some(obj) = StreamBuffer::as_mut(object) { + if let Some(buffer) = StreamBuffer::as_ref(buf) { + obj.write_streambuffer(buffer) + } else { + false + } + } else { + false + } +} +/// StreamBufferRead +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferRead(object: *const StreamBuffer, buf: *mut StreamBuffer) -> bool { + info!(LOG_LABEL, "enter StreamBufferRead"); + if let Some(obj) = StreamBuffer::as_ref(object) { + if let Some(buffer) = StreamBuffer::as_mut(buf) { + obj.read_streambuffer(buffer) + } else { + false + } + } else { + false + } +} +/// StreamBufferChkRWError +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferChkRWError(object: *const StreamBuffer) -> bool { + if let Some(obj) = StreamBuffer::as_ref(object) { + return obj.chk_rwerror(); + } + false +} +/// StreamBufferGetErrorStatusRemark +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferGetErrorStatusRemark(object: *const StreamBuffer) -> *const c_char { + info!(LOG_LABEL, "enter StreamBufferGetErrorStatusRemark"); + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.get_error_status_remark() + } else { + std::ptr::null() + } +} +/// StreamBufferWriteChar +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferWriteChar(object: *mut StreamBuffer, buf: *const c_char, size: usize) -> bool { + info!(LOG_LABEL, "enter StreamBufferWriteChar"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.write_char_usize(buf, size) + } else { + false + } +} +/// StreamBufferCheckWrite +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferCheckWrite(object: *mut StreamBuffer, size: usize) -> bool { + info!(LOG_LABEL, "enter StreamBufferCheckWrite"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.check_write(size) + } else { + false + } +} +/// CircleStreamBufferCopyDataToBegin +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn CircleStreamBufferCopyDataToBegin(object: *mut StreamBuffer) -> i32 { + info!(LOG_LABEL, "enter CircleStreamBufferCopyDataToBegin"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.copy_data_to_begin(); + BufferStatusCode::Ok.into() + } else { + BufferStatusCode::CopyDataToBeginFail.into() + } +} +/// StreamBufferReadChar +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferReadChar(object: *mut StreamBuffer, buf: *const c_char, size: usize) -> bool { + info!(LOG_LABEL, "enter StreamBufferReadChar"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.read_char_usize(buf, size) + } else { + false + } +} +/// CircleStreamBufferWrite +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn CircleStreamBufferWrite(object: *mut StreamBuffer, buf: *const c_char, size: usize) -> bool { + info!(LOG_LABEL, "enter CircleStreamBufferWrite"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.circle_write(buf, size) + } else { + false + } +} +/// ReadClientPackets +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn ReadClientPackets(object: *mut StreamBuffer, stream_client: *const CSensorServiceClient, + callback_fun: ClientPacketCallBackFun) -> i32 { + info!(LOG_LABEL,"enter ReadClientPackets"); + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.read_client_packets(stream_client, callback_fun); + BufferStatusCode::Ok.into() + } else { + BufferStatusCode::ReadClientPacketsFail.into() + } +} +/// StreamBufferReadBuf +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferReadBuf(object: *const StreamBuffer) -> *const c_char { + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.read_buf() + } else { + std::ptr::null() + } +} +/// StreamBufferGetRcount +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferGetRcount(object: *const StreamBuffer) -> i32 { + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.r_count() as i32 + } else { + BufferStatusCode::RcountFail.into() + } +} +/// StreamBufferGetWcount +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferGetWcount(object: *const StreamBuffer) -> i32 { + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.w_count() as i32 + } else { + BufferStatusCode::WcountFail.into() + } +} +/// StreamBufferGetWpos +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferGetWpos(object: *const StreamBuffer) -> i32 { + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.w_pos() as i32 + } else { + BufferStatusCode::WposFail.into() + } +} +/// StreamBufferGetRpos +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferGetRpos(object: *const StreamBuffer) -> i32 { + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.r_pos() as i32 + } else { + BufferStatusCode::RposFail.into() + } +} +/// StreamBufferGetSzBuff +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferGetSzBuff(object: *const StreamBuffer) -> *const c_char { + if let Some(obj) = StreamBuffer::as_ref(object) { + obj.sz_buff() + } else { + std::ptr::null() + } +} +/// StreamBufferSetRwErrStatus +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferSetRwErrStatus(object: *mut StreamBuffer, rw_error_status: ErrorStatus) -> i32 { + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.set_rw_error_status(rw_error_status); + BufferStatusCode::Ok.into() + } else { + BufferStatusCode::SetRwErrStatusFail.into() + } +} +/// StreamBufferSetRpos +/// +/// # Safety +/// +/// object is valid +#[no_mangle] +pub unsafe extern "C" fn StreamBufferSetRpos(object: *mut StreamBuffer, r_pos: i32) -> i32 { + if let Some(obj) = StreamBuffer::as_mut(object) { + obj.set_r_pos(r_pos as usize); + BufferStatusCode::Ok.into() + } else { + BufferStatusCode::SetRposFail.into() + } +} + diff --git a/rust/utils/socket_ipc_rust_ffi/src/stream_session.rs b/rust/utils/socket_ipc_rust_ffi/src/stream_session.rs new file mode 100644 index 0000000000000000000000000000000000000000..3a14f74654de602958cb208536a43e582592d95f --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/stream_session.rs @@ -0,0 +1,190 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// provide C interface to C++ for calling +pub mod ffi; +use hilog_rust::{debug, error, hilog, HiLogLabel, LogType}; +use libc::{int32_t, int64_t, c_int}; +use std::{ffi::{CString, c_char}, thread::sleep, time::Duration}; +use crate::net_packet::NetPacket; +use crate::stream_buffer::StreamBuffer; +const LOG_LABEL: HiLogLabel = HiLogLabel { + log_type: LogType::LogCore, + domain: 0xD002220, + tag: "StreamSession" +}; +const MAX_PACKET_BUF_SIZE: usize = 256; +const SEND_RETRY_LIMIT: i32 = 32; +const SEND_RETRY_SLEEP_TIME: u64 = 10000; + +#[repr(C)] +struct EventTime { + id: int32_t, + event_time: int64_t, + timer_id: int32_t, +} + +/// struct stream session +#[repr(C)] +pub struct StreamSession { + /// module_type field + pub module_type: i32, + /// fd field + pub fd: i32, + /// uid field + pub uid : i32, + /// pid field + pub pid: i32, + /// token type field + pub token_type: i32, +} + +impl Default for StreamSession { + fn default() -> Self { + Self { + module_type: -1, + fd: -1, + uid: -1, + pid: -1, + token_type: -1, + } + } +} + +impl StreamSession { + /// return const referance of self + /// + /// # Safety + /// + /// Makesure object is null pointer + unsafe fn as_ref<'a>(object: *const Self) -> Option<&'a Self>{ + object.as_ref() + } + /// return mutable referance of self + /// + /// # Safety + /// + /// Makesure object is null pointer + unsafe fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self>{ + object.as_mut() + } + + fn uid(&self) -> i32 { + self.uid + } + + fn pid(&self) -> i32 { + self.pid + } + + fn module_type(&self) -> i32 { + self.module_type + } + + fn session_fd(&self) -> i32 { + self.fd + } + + fn set_token_type(&mut self, style: i32) { + self.token_type = style + } + + fn set_uid(&mut self, uid: i32) { + self.uid = uid + } + + fn set_pid(&mut self, pid: i32) { + self.pid = pid + } + + fn set_fd(&mut self, fd: i32) { + self.fd = fd + } + + fn token_type(&self) -> i32 { + self.token_type + } + + fn session_close(&mut self) { + debug!(LOG_LABEL, "Enter fd_:{}.", self.fd); + if self.fd >= 0 { + unsafe { + libc::close(self.fd as c_int); + } + self.fd = -1; + } + } + + fn session_send_msg(&self, buf: *const c_char, size: usize) -> bool { + if buf.is_null() { + error!(LOG_LABEL, "buf is null"); + return false; + } + if size == 0 || size > MAX_PACKET_BUF_SIZE { + error!(LOG_LABEL, "size is either equal to 0 or greater than MAX_PACKET_BUF_SIZE, size: {}", size); + return false; + } + if self.fd < 0 { + error!(LOG_LABEL, "The fd is less than 0, fd: {}", self.fd); + return false; + } + let mut idx: usize = 0; + let mut retry_count: i32 = 0; + let buf_size = size; + let mut rem_size = buf_size; + while rem_size > 0 && retry_count < SEND_RETRY_LIMIT { + retry_count += 1; + let count; + let errno; + // safety: call libc library function which is unsafe function + unsafe { + count = libc::send(self.fd as c_int, buf.add(idx) as *const libc::c_void, + rem_size, libc::MSG_DONTWAIT | libc::MSG_NOSIGNAL); + errno = *libc::__errno_location(); + }; + if count < 0 { + if errno == libc::EAGAIN || errno == libc::EINTR || errno == libc::EWOULDBLOCK { + sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); + error!(LOG_LABEL, "Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:{}", errno); + continue; + } + error!(LOG_LABEL, "Send return failed,error:{} fd:{}", errno, self.fd); + return false; + } + idx += count as usize; + rem_size -= count as usize; + if rem_size > 0 { + sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); + } + } + if retry_count >= SEND_RETRY_LIMIT || rem_size != 0 { + error!(LOG_LABEL, "Send too many times:{}/{},size:{}/{} fd:{}", + retry_count, SEND_RETRY_LIMIT, idx, buf_size, self.fd); + return false; + } + true + } + + /// session send message + pub fn send_msg_pkt(&self, pkt: &NetPacket) -> bool { + if pkt.stream_buffer.chk_rwerror() { + error!(LOG_LABEL, "Read and write status is error"); + return false; + } + let mut buf: StreamBuffer = Default::default(); + pkt.make_data(&mut buf); + self.session_send_msg(buf.data(), buf.size()) + } +} diff --git a/rust/utils/socket_ipc_rust_ffi/src/stream_session/ffi.rs b/rust/utils/socket_ipc_rust_ffi/src/stream_session/ffi.rs new file mode 100644 index 0000000000000000000000000000000000000000..496ed19e7fda887d0bf24bb15df507afdf410fd8 --- /dev/null +++ b/rust/utils/socket_ipc_rust_ffi/src/stream_session/ffi.rs @@ -0,0 +1,205 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::*; +use crate::error::SessionStatusCode; +use hilog_rust::{info, hilog, HiLogLabel, LogType}; +const LOG_LABEL: HiLogLabel = HiLogLabel { + log_type: LogType::LogCore, + domain: 0xD002220, + tag: "stream_session_ffi" +}; +/// create unique_ptr of stream_session for C++ +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionCreate() -> *mut StreamSession { + let stream_session: Box:: = Box::default(); + Box::into_raw(stream_session) +} +/// drop unique_ptr of stream_session for C++ +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionDelete(raw: *mut StreamSession) { + if !raw.is_null() { + drop(Box::from_raw(raw)); + } +} +/// StreamSessionSetUid +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionSetUid(object: *mut StreamSession, uid: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSessionSetUid"); + if let Some(obj) = StreamSession::as_mut(object) { + obj.set_uid(uid); + SessionStatusCode::Ok.into() + } else { + SessionStatusCode::SetUidFail.into() + } +} +/// StreamSessionSetFd +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionSetFd(object: *mut StreamSession, fd: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSessionSetFd"); + if let Some(obj) = StreamSession::as_mut(object) { + obj.set_fd(fd); + SessionStatusCode::Ok.into() + } else { + SessionStatusCode::SetFdFail.into() + } +} +/// StreamSessionSetPid +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionSetPid(object: *mut StreamSession, pid: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSessionSetPid"); + if let Some(obj) = StreamSession::as_mut(object) { + obj.set_pid(pid); + SessionStatusCode::Ok.into() + } else { + SessionStatusCode::SetPidFail.into() + } +} +/// StreamSessionGetUid +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionGetUid(object: *const StreamSession) -> i32 { + info!(LOG_LABEL, "enter StreamSessionGetUid"); + if let Some(obj) = StreamSession::as_ref(object) { + obj.uid() + } else { + SessionStatusCode::UidFail.into() + } +} +/// StreamSessionGetPid +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionGetPid(object: *const StreamSession) -> i32 { + info!(LOG_LABEL, "enter StreamSessionGetPid"); + if let Some(obj) = StreamSession::as_ref(object) { + obj.pid() + } else { + SessionStatusCode::PidFail.into() + } +} + +/// get session fd +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionGetFd(object: *const StreamSession) -> i32 { + info!(LOG_LABEL, "enter StreamSessionGetFd"); + if let Some(obj) = StreamSession::as_ref(object) { + obj.session_fd() + } else { + SessionStatusCode::FdFail.into() + } +} +/// get token type +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionSetTokenType(object: *mut StreamSession, style: i32) -> i32 { + info!(LOG_LABEL, "enter StreamSessionSetTokenType"); + if let Some(obj) = StreamSession::as_mut(object) { + obj.set_token_type(style); + SessionStatusCode::Ok.into() + } else { + SessionStatusCode::SetTokenTypeFail.into() + } +} +/// get token type +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionGetTokenType(object: *const StreamSession) -> i32 { + info!(LOG_LABEL, "enter StreamSessionGetTokenType"); + if let Some(obj) = StreamSession::as_ref(object) { + obj.token_type() + } else { + SessionStatusCode::TokenTypeFail.into() + } +} +/// get module_type +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionGetModuleType(object: *const StreamSession) -> i32 { + info!(LOG_LABEL, "enter StreamSessionGetModuleType"); + if let Some(obj) = StreamSession::as_ref(object) { + obj.module_type() + } else { + SessionStatusCode::ModuleTypeFail.into() + } +} +/// get session close +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionClose(object: *mut StreamSession) -> i32 { + info!(LOG_LABEL, "enter StreamSessionClose"); + if let Some(obj) = StreamSession::as_mut(object) { + obj.session_close(); + SessionStatusCode::Ok.into() + } else { + SessionStatusCode::CloseFail.into() + } +} + +/// session send message +/// +/// # Safety +/// +/// object must be valid +#[no_mangle] +pub unsafe extern "C" fn StreamSessionSendMsg(object: *const StreamSession, buf: *const c_char, size: usize) -> bool { + info!(LOG_LABEL, "enter StreamSessionSendMsg"); + if let Some(obj) = StreamSession::as_ref(object) { + obj.session_send_msg(buf, size) + } else { + false + } +} \ No newline at end of file diff --git a/sensor.gni b/sensor.gni index f3374d93cb01c55e2124c6c7d37daa684f40cb9c..0a8dccb42e4747851ae0df3aa87106c63c5ef52f 100644 --- a/sensor.gni +++ b/sensor.gni @@ -13,4 +13,14 @@ import("//build/ohos.gni") +declare_args() { + rust_socket_ipc = true +} + SUBSYSTEM_DIR = "//base/sensors/sensor" + +sensor_default_defines = [] + +if (rust_socket_ipc) { + sensor_default_defines += [ "OHOS_BUILD_ENABLE_RUST" ] +} diff --git a/services/sensor/BUILD.gn b/services/sensor/BUILD.gn index da1031c44a5e3782f817efc8306a5de35cfb0953..f0d870e1cfe88b25a5a8f5ce96238a13479cf629 100644 --- a/services/sensor/BUILD.gn +++ b/services/sensor/BUILD.gn @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import("//base/sensors/sensor/sensor.gni") import("//build/ohos.gni") import("./../../sensor.gni") @@ -44,11 +45,17 @@ ohos_shared_library("libsensor_service") { "$SUBSYSTEM_DIR/utils/ipc/include", ] + defines = sensor_default_defines + deps = [ "$SUBSYSTEM_DIR/utils/common:libsensor_utils", "$SUBSYSTEM_DIR/utils/ipc:libsensor_ipc", ] + if (rust_socket_ipc) { + deps += [ "$SUBSYSTEM_DIR/rust/utils/socket_ipc_rust_ffi:sensor_rust_util_ffi" ] + } + external_deps = [ "access_token:libaccesstoken_sdk", "c_utils:utils", diff --git a/services/sensor/src/sensor_power_policy.cpp b/services/sensor/src/sensor_power_policy.cpp index 3c5ab4743d42efa06461c76ab75224b525c1bcd7..0aece16fb6d2fe8d2fdab32c35d1ce43dd50cb4c 100644 --- a/services/sensor/src/sensor_power_policy.cpp +++ b/services/sensor/src/sensor_power_policy.cpp @@ -220,7 +220,11 @@ void SensorPowerPolicy::ReportActiveInfo(const ActiveInfo &activeInfo, NetPacket pkt(MessageId::ACTIVE_INFO); pkt << activeInfo.GetPid() << activeInfo.GetSensorId() << activeInfo.GetSamplingPeriodNs() << activeInfo.GetMaxReportDelayNs(); +#ifdef OHOS_BUILD_ENABLE_RUST + if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) { +#else if (pkt.ChkRWError()) { +#endif // OHOS_BUILD_ENABLE_RUST SEN_HILOGE("Packet write data failed"); return; } diff --git a/utils/ipc/BUILD.gn b/utils/ipc/BUILD.gn index 2792d7b88f9134d786a516498c0501f56ce6b11a..3710dd37b75c5fdd05a6d8d4d1ebbefec84d1b44 100644 --- a/utils/ipc/BUILD.gn +++ b/utils/ipc/BUILD.gn @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import("//base/sensors/sensor/sensor.gni") import("//build/ohos.gni") import("./../../sensor.gni") @@ -28,6 +29,12 @@ ohos_shared_library("libsensor_ipc") { "$SUBSYSTEM_DIR/utils/ipc/include", ] + defines = sensor_default_defines + + if (rust_socket_ipc) { + deps = [ "$SUBSYSTEM_DIR/rust/utils/socket_ipc_rust_ffi:sensor_rust_util_ffi" ] + } + external_deps = [ "access_token:libaccesstoken_sdk", "c_utils:utils", diff --git a/utils/ipc/include/rust_binding.h b/utils/ipc/include/rust_binding.h new file mode 100644 index 0000000000000000000000000000000000000000..b66b1586033c3619e9543e7d7a2c20b22692027a --- /dev/null +++ b/utils/ipc/include/rust_binding.h @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef RUST_BINDING_H +#define RUST_BINDING_H +#include +#include +#include "proto.h" + +extern "C" { + struct RustStreamSocket; + struct RustStreamSession; + struct RustStreamBuffer; + struct RustNetPacket { + OHOS::Sensors::MessageId msgId { OHOS::Sensors::MessageId::INVALID }; + struct RustStreamBuffer* streamBuffer; + }; + RustStreamSocket* StreamSocketCreate(void); + void StreamSocketDelete(RustStreamSocket* raw); + int32_t StreamSocketGetFd(const RustStreamSocket* rustStreamSocket); + int32_t StreamSocketGetEpollFd(const RustStreamSocket* rustStreamSocket); + int32_t StreamSocketEpollCreate(RustStreamSocket* rustStreamSocket, int32_t fd); + int32_t StreamSocketEpollCtl(RustStreamSocket* rustStreamSocket, int32_t fd, int32_t op, struct epoll_event* event, int32_t epollFd); + int32_t StreamSocketEpollWait(RustStreamSocket* rustStreamSocket, struct epoll_event* events, int32_t maxevents, int32_t timeout, int32_t epollFd); + int32_t StreamSocketEpollClose(RustStreamSocket* rustStreamSocket); + int32_t StreamSocketClose(RustStreamSocket* rustStreamSocket); + int32_t StreamSocketSetFd(RustStreamSocket* rustStreamSocket, int32_t fd); + RustStreamSession* StreamSessionCreate(void); + void StreamSessionDelete(RustStreamSession* raw); + void StreamSessionSetUid(RustStreamSession* rustStreamSession, int32_t uid); + void StreamSessionSetPid(RustStreamSession* rustStreamSession, int32_t pid); + void StreamSessionSetFd(RustStreamSession* rustStreamSession, int32_t fd); + void StreamSessionClose(RustStreamSession* rustStreamSession); + bool StreamSessionSendMsg(const RustStreamSession* rustStreamSession, const char* buf, size_t size); + int32_t StreamSessionGetUid(const RustStreamSession* rustStreamSession); + int32_t StreamSessionGetPid(const RustStreamSession* rustStreamSession); + int32_t StreamSessionGetFd(const RustStreamSession* rustStreamSession); + void StreamSessionSetTokenType(RustStreamSession* rustStreamSession, int32_t type); + int32_t StreamSessionGetTokenType(const RustStreamSession* rustStreamSession); + int32_t StreamSessionGetModuleType(const RustStreamSession* rustStreamSession); + RustStreamBuffer* StreamBufferCreate(void); + void StreamBufferDelete(RustStreamBuffer* raw); + int32_t StreamBufferGetWcount(const RustStreamBuffer* rustStreamBuffer); + int32_t StreamBufferGetRcount(const RustStreamBuffer* rustStreamBuffer); + int32_t StreamBufferGetWpos(const RustStreamBuffer* rustStreamBuffer); + int32_t StreamBufferGetRpos(const RustStreamBuffer* rustStreamBuffer); + const char* StreamBufferGetSzBuff(const RustStreamBuffer* rustStreamBuffer); + int32_t StreamBufferSetRwErrStatus(RustStreamBuffer* rustStreamBuffer, int32_t rwErrStatus); + int32_t StreamBufferSetRpos(RustStreamBuffer* rustStreamBuffer, int32_t rPos); + void StreamBufferReset(RustStreamBuffer* rustStreamBuffer); + void StreamBufferClean(RustStreamBuffer* rustStreamBuffer); + bool StreamBufferRead(RustStreamBuffer* rustStreamBuffer1, RustStreamBuffer* rustStreamBuffer2); + bool StreamBufferWrite(RustStreamBuffer* rustStreamBuffer1, const RustStreamBuffer* rustStreamBuffer2); + const char* StreamBufferReadBuf(const RustStreamBuffer* rustStreamBuffer); + bool StreamBufferReadChar(RustStreamBuffer* rustStreamBuffer, char* buf, size_t size); + bool StreamBufferWriteChar(RustStreamBuffer* rustStreamBuffer, const char* buf, size_t size); + const char* StreamBufferData(const RustStreamBuffer* rustStreamBuffer); + size_t StreamBufferSize(const RustStreamBuffer* rustStreamBuffer); + const char* StreamBufferGetErrorStatusRemark(const RustStreamBuffer* rustStreamBuffer); + bool StreamBufferChkRWError(const RustStreamBuffer* rustStreamBuffer); + bool StreamBufferCheckWrite(RustStreamBuffer* rustStreamBuffer, size_t size); + bool CircleStreamBufferWrite(RustStreamBuffer* rustStreamBuffer, const char* buf, size_t size); + void CircleStreamBufferCopyDataToBegin(RustStreamBuffer* rustStreamBuffer); +} +#endif // RUST_BINDING_H \ No newline at end of file diff --git a/utils/ipc/include/stream_buffer.h b/utils/ipc/include/stream_buffer.h index a7cdcfa087f6b895d5343a9fcd054626956cef53..619b99649cd3dc5e12b0de9787f1cbd9e8440614 100644 --- a/utils/ipc/include/stream_buffer.h +++ b/utils/ipc/include/stream_buffer.h @@ -26,6 +26,10 @@ #include "proto.h" #include "sensors_errors.h" +#ifdef OHOS_BUILD_ENABLE_RUST +#include "rust_binding.h" +#endif // OHOS_BUILD_ENABLE_RUST + namespace OHOS { namespace Sensors { class StreamBuffer { @@ -37,21 +41,23 @@ public: virtual ~StreamBuffer() = default; void Reset(); void Clean(); - bool SeekReadPos(size_t n); bool Read(std::string &buf); bool Read(StreamBuffer &buf); bool Read(char *buf, size_t size); bool Write(const std::string &buf); bool Write(const StreamBuffer &buf); virtual bool Write(const char *buf, size_t size); - bool IsEmpty() const; +#ifndef OHOS_BUILD_ENABLE_RUST bool ChkRWError() const; + bool SeekReadPos(size_t n); + bool IsEmpty() const; size_t Size() const; size_t UnreadSize() const; size_t GetAvailableBufSize() const; const std::string& GetErrorStatusRemark() const; const char* Data() const; - + const char *WriteBuf() const; +#endif // OHOS_BUILD_ENABLE_RUST template bool Read(T &data); template @@ -61,7 +67,7 @@ public: template bool Write(const std::vector &data); const char *ReadBuf() const; - const char *WriteBuf() const; + template StreamBuffer &operator >> (T &data); template @@ -70,6 +76,11 @@ public: protected: bool Clone(const StreamBuffer &buf); +#ifdef OHOS_BUILD_ENABLE_RUST +public: + std::unique_ptr streamBufferPtr_ + { StreamBufferCreate(), StreamBufferDelete }; +#else enum class ErrorStatus { ERROR_STATUS_OK, ERROR_STATUS_READ, @@ -81,14 +92,22 @@ protected: size_t rPos_ { 0 }; size_t wPos_ { 0 }; char szBuff_[MAX_STREAM_BUF_SIZE + 1] = {}; +#endif // OHOS_BUILD_ENABLE_RUST + }; template bool StreamBuffer::Read(T &data) { if (!Read(reinterpret_cast(&data), sizeof(data))) { +#ifdef OHOS_BUILD_ENABLE_RUST + const char* s = StreamBufferGetErrorStatusRemark(streamBufferPtr_.get()); + SEN_HILOGE("[%{public}s] size:%{public}zu count:%{public}d", + s, sizeof(data), StreamBufferGetRcount(streamBufferPtr_.get()) + 1); +#else SEN_HILOGE("%{public}s, size:%{public}zu, count:%{public}zu", GetErrorStatusRemark().c_str(), sizeof(data), rCount_ + 1); +#endif // OHOS_BUILD_ENABLE_RUST return false; } return true; @@ -98,8 +117,14 @@ template bool StreamBuffer::Write(const T &data) { if (!Write(reinterpret_cast(&data), sizeof(data))) { +#ifdef OHOS_BUILD_ENABLE_RUST + const char* s = StreamBufferGetErrorStatusRemark(streamBufferPtr_.get()); + SEN_HILOGE("[%{public}s] size:%{public}zu,count:%{public}d", + s, sizeof(data), StreamBufferGetWcount(streamBufferPtr_.get()) + 1); +#else SEN_HILOGE("%{public}s, size:%{public}zu, count:%{public}zu", GetErrorStatusRemark().c_str(), sizeof(data), wCount_ + 1); +#endif // OHOS_BUILD_ENABLE_RUST return false; } return true; diff --git a/utils/ipc/include/stream_session.h b/utils/ipc/include/stream_session.h index 55a991e9fe86923c4366fb743bf863020add45fb..e0f25ae0312ba73461a44f779e7d1105793edca0 100644 --- a/utils/ipc/include/stream_session.h +++ b/utils/ipc/include/stream_session.h @@ -28,6 +28,10 @@ #include "net_packet.h" #include "proto.h" +#ifdef OHOS_BUILD_ENABLE_RUST +#include "rust_binding.h" +#endif // OHOS_BUILD_ENABLE_RUST + namespace OHOS { namespace Sensors { @@ -61,10 +65,15 @@ protected: std::map> events_; std::string descript_; const std::string programName_; +#ifdef OHOS_BUILD_ENABLE_RUST + std::unique_ptr streamSessionPtr_ + { StreamSessionCreate(), StreamSessionDelete }; +#else int32_t fd_ { -1 }; const int32_t uid_ { -1 }; const int32_t pid_ { -1 }; int32_t tokenType_ { ATokenTypeEnum::TOKEN_INVALID }; +#endif // OHOS_BUILD_ENABLE_RUST }; } // namespace Sensors } // namespace OHOS diff --git a/utils/ipc/include/stream_socket.h b/utils/ipc/include/stream_socket.h index d70d6d1c12e91e00862f84d91b0aa8d8190a9b08..f112f52ac7ddd347245862c6122128e5d3996ce9 100644 --- a/utils/ipc/include/stream_socket.h +++ b/utils/ipc/include/stream_socket.h @@ -31,29 +31,37 @@ #include "circle_stream_buffer.h" #include "net_packet.h" +#ifdef OHOS_BUILD_ENABLE_RUST +#include "rust_binding.h" +#endif // OHOS_BUILD_ENABLE_RUST namespace OHOS { namespace Sensors { class StreamSocket { public: - using PacketCallBackFun = std::function; + using PacketCallBackFun = std::function; StreamSocket(); virtual ~StreamSocket(); int32_t EpollCreate(int32_t size); int32_t EpollCtl(int32_t fd, int32_t op, struct epoll_event &event, int32_t epollFd = -1); int32_t EpollWait(struct epoll_event &events, int32_t maxevents, int32_t timeout, int32_t epollFd = -1); - void OnReadPackets(CircleStreamBuffer &buf, PacketCallBackFun callbackFun); void EpollClose(); void Close(); int32_t GetFd() const; int32_t GetEpollFd() const; - +#ifndef OHOS_BUILD_ENABLE_RUST + void OnReadPackets(CircleStreamBuffer &buf, PacketCallBackFun callbackFun); +#endif // OHOS_BUILD_ENABLE_RUST DISALLOW_COPY_AND_MOVE(StreamSocket); - protected: - int32_t fd_ { -1 }; - int32_t epollFd_ { -1 }; +#ifdef OHOS_BUILD_ENABLE_RUST + std::unique_ptr streamSocketPtr_ + { StreamSocketCreate(), StreamSocketDelete }; +#else + int32_t fd_{-1}; + int32_t epollFd_{-1}; +#endif // OHOS_BUILD_ENABLE_RUST }; } // namespace Sensors } // namespace OHOS -#endif // STREAM_SOCKET_H \ No newline at end of file +#endif // STREAM_SOCKET_H \ No newline at end of file diff --git a/utils/ipc/src/circle_stream_buffer.cpp b/utils/ipc/src/circle_stream_buffer.cpp index c086b695719ef5dcfe11925ea450a93cf63f9f53..ccdafc97f1263bee91d1753b5b4b7749aba825ca 100644 --- a/utils/ipc/src/circle_stream_buffer.cpp +++ b/utils/ipc/src/circle_stream_buffer.cpp @@ -1,57 +1,69 @@ -/* - * Copyright (c) 2023 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "circle_stream_buffer.h" - -#include "sensors_errors.h" - -namespace OHOS { -namespace Sensors { -bool CircleStreamBuffer::CheckWrite(size_t size) -{ - size_t availSize = GetAvailableBufSize(); - if (size > availSize && rPos_ > 0) { - CopyDataToBegin(); - availSize = GetAvailableBufSize(); - } - return (availSize >= size); -} - -bool CircleStreamBuffer::Write(const char *buf, size_t size) -{ - if (!CheckWrite(size)) { - SEN_HILOGE("Buffer is overflow, availableSize:%{public}zu, size:%{public}zu," - "unreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu", - GetAvailableBufSize(), size, UnreadSize(), rPos_, wPos_); - return false; - } - return StreamBuffer::Write(buf, size); -} - -void CircleStreamBuffer::CopyDataToBegin() -{ - size_t unreadSize = UnreadSize(); - if (unreadSize > 0 && rPos_ > 0) { - size_t pos = 0; - for (size_t i = rPos_; i <= wPos_;) { - szBuff_[pos++] = szBuff_[i++]; - } - } - SEN_HILOGD("UnreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu", unreadSize, rPos_, wPos_); - rPos_ = 0; - wPos_ = unreadSize; -} -} // namespace Sensors +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "circle_stream_buffer.h" + +#include "sensors_errors.h" + +namespace OHOS { +namespace Sensors { +bool CircleStreamBuffer::CheckWrite(size_t size) +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferCheckWrite(streamBufferPtr_.get(), size); +#else + size_t availSize = GetAvailableBufSize(); + if (size > availSize && rPos_ > 0) { + CopyDataToBegin(); + availSize = GetAvailableBufSize(); + } + return (availSize >= size); +#endif // OHOS_BUILD_ENABLE_RUST +} + +bool CircleStreamBuffer::Write(const char *buf, size_t size) +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return CircleStreamBufferWrite(streamBufferPtr_.get(), buf, size); +#else + if (!CheckWrite(size)) { + SEN_HILOGE("Buffer is overflow, availableSize:%{public}zu, size:%{public}zu," + "unreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu", + GetAvailableBufSize(), size, UnreadSize(), rPos_, wPos_); + return false; + } + return StreamBuffer::Write(buf, size); +#endif // OHOS_BUILD_ENABLE_RUST +} + +void CircleStreamBuffer::CopyDataToBegin() +{ +#ifdef OHOS_BUILD_ENABLE_RUST + CircleStreamBufferCopyDataToBegin(streamBufferPtr_.get()); +#else + size_t unreadSize = UnreadSize(); + if (unreadSize > 0 && rPos_ > 0) { + size_t pos = 0; + for (size_t i = rPos_; i <= wPos_;) { + szBuff_[pos++] = szBuff_[i++]; + } + } + SEN_HILOGD("UnreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu", unreadSize, rPos_, wPos_); + rPos_ = 0; + wPos_ = unreadSize; +#endif // OHOS_BUILD_ENABLE_RUST +} +} // namespace Sensors } // namespace OHOS \ No newline at end of file diff --git a/utils/ipc/src/net_packet.cpp b/utils/ipc/src/net_packet.cpp index 27dec89bc2f8d14faffdfdd21b2dcb86d7716efb..a747b579fbe2b26c8bf923e34323c3dba860dfd3 100644 --- a/utils/ipc/src/net_packet.cpp +++ b/utils/ipc/src/net_packet.cpp @@ -1,62 +1,85 @@ -/* - * Copyright (c) 2023 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "net_packet.h" - -#include "sensors_errors.h" - -namespace OHOS { -namespace Sensors { -NetPacket::NetPacket(MessageId msgId) : msgId_(msgId) -{} - -NetPacket::NetPacket(const NetPacket &pkt) : NetPacket(pkt.GetMsgId()) -{ - Clone(pkt); -} - -void NetPacket::MakeData(StreamBuffer &buf) const -{ - PACKHEAD head = {msgId_, wPos_}; - buf << head; - if (wPos_ > 0) { - if (!buf.Write(&szBuff_[0], wPos_)) { - SEN_HILOGE("Write data to stream failed"); - return; - } - } -} - -size_t NetPacket::GetSize() const -{ - return Size(); -} - -size_t NetPacket::GetPacketLength() const -{ - return sizeof(PackHead) + wPos_; -} - -const char* NetPacket::GetData() const -{ - return Data(); -} - -MessageId NetPacket::GetMsgId() const -{ - return msgId_; -} -} // namespace Sensors -} // namespace OHOS +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "net_packet.h" + +#include "sensors_errors.h" + +namespace OHOS { +namespace Sensors { +NetPacket::NetPacket(MessageId msgId) : msgId_(msgId) +{} + +NetPacket::NetPacket(const NetPacket &pkt) : NetPacket(pkt.GetMsgId()) +{ + Clone(pkt); +} + +void NetPacket::MakeData(StreamBuffer &buf) const +{ +#ifdef OHOS_BUILD_ENABLE_RUST + PACKHEAD head = {msgId_, StreamBufferGetWpos(streamBufferPtr_.get())}; + buf << head; + if (StreamBufferGetWpos(streamBufferPtr_.get()) > 0) { + if (!buf.Write(StreamBufferGetSzBuff(streamBufferPtr_.get()), StreamBufferGetWpos(streamBufferPtr_.get()))) { + SEN_HILOGE("Write data to stream failed"); + return; + } + } +#else + PACKHEAD head = {msgId_, wPos_}; + buf << head; + if (wPos_ > 0) { + if (!buf.Write(&szBuff_[0], wPos_)) { + SEN_HILOGE("Write data to stream failed"); + return; + } + } +#endif // OHOS_BUILD_ENABLE_RUST +} + +size_t NetPacket::GetSize() const +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferSize(streamBufferPtr_.get()); +#else + return Size(); +#endif // OHOS_BUILD_ENABLE_RUST +} + +size_t NetPacket::GetPacketLength() const +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return (static_cast(sizeof(PackHead)) + StreamBufferGetWpos(streamBufferPtr_.get())); +#else + return sizeof(PackHead) + wPos_; +#endif // OHOS_BUILD_ENABLE_RUST +} + +const char* NetPacket::GetData() const +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferData(streamBufferPtr_.get()); +#else + return Data(); +#endif // OHOS_BUILD_ENABLE_RUST +} + +MessageId NetPacket::GetMsgId() const +{ + return msgId_; +} +} // namespace Sensors +} // namespace OHOS diff --git a/utils/ipc/src/stream_buffer.cpp b/utils/ipc/src/stream_buffer.cpp index e666585c20d118f34899617c002006c9a00f2729..bca854858f870fff91edd9cfa36a578ec7dd1571 100644 --- a/utils/ipc/src/stream_buffer.cpp +++ b/utils/ipc/src/stream_buffer.cpp @@ -30,36 +30,45 @@ StreamBuffer &StreamBuffer::operator=(const StreamBuffer &other) void StreamBuffer::Reset() { +#ifdef OHOS_BUILD_ENABLE_RUST + StreamBufferReset(streamBufferPtr_.get()); +#else rPos_ = 0; wPos_ = 0; rCount_ = 0; wCount_ = 0; rwErrorStatus_ = ErrorStatus::ERROR_STATUS_OK; +#endif // OHOS_BUILD_ENABLE_RUST } void StreamBuffer::Clean() { +#ifdef OHOS_BUILD_ENABLE_RUST + StreamBufferClean(streamBufferPtr_.get()); +#else Reset(); errno_t ret = memset_sp(&szBuff_, sizeof(szBuff_), 0, sizeof(szBuff_)); if (ret != EOK) { SEN_HILOGE("Call memset_s fail"); return; } +#endif // OHOS_BUILD_ENABLE_RUST } -bool StreamBuffer::SeekReadPos(size_t n) +bool StreamBuffer::Read(std::string &buf) { - size_t pos = rPos_ + n; - if (pos > wPos_) { - SEN_HILOGE("The position in the calculation is not as expected. pos:%{public}zu, [0, %{public}zu]", pos, wPos_); +#ifdef OHOS_BUILD_ENABLE_RUST + const int32_t ERROR_STATUS_READ = 1; + if (StreamBufferGetRpos(streamBufferPtr_.get()) == StreamBufferGetWpos(streamBufferPtr_.get())) { + SEN_HILOGE("Not enough memory to read, errCode:%{public}d", STREAM_BUF_READ_FAIL); + StreamBufferSetRwErrStatus(streamBufferPtr_.get(), ERROR_STATUS_READ); return false; } - rPos_ = pos; - return true; -} - -bool StreamBuffer::Read(std::string &buf) -{ + buf = ReadBuf(); + StreamBufferSetRpos(streamBufferPtr_.get(), + StreamBufferGetRpos(streamBufferPtr_.get()) + static_cast(buf.length()) + 1); + return (buf.length() > 0); +#else if (rPos_ == wPos_) { SEN_HILOGE("Not enough memory to read"); rwErrorStatus_ = ErrorStatus::ERROR_STATUS_READ; @@ -68,25 +77,37 @@ bool StreamBuffer::Read(std::string &buf) buf = ReadBuf(); rPos_ += buf.length() + 1; return (buf.length() > 0); +#endif // OHOS_BUILD_ENABLE_RUST } bool StreamBuffer::Write(const std::string &buf) { - return Write(buf.c_str(), buf.length() + 1); + return Write(buf.c_str(), buf.length()+1); } bool StreamBuffer::Read(StreamBuffer &buf) { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferRead(streamBufferPtr_.get(), buf.streamBufferPtr_.get()); +#else return buf.Write(Data(), Size()); +#endif // OHOS_BUILD_ENABLE_RUST } bool StreamBuffer::Write(const StreamBuffer &buf) { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferWrite(streamBufferPtr_.get(), buf.streamBufferPtr_.get()); +#else return Write(buf.Data(), buf.Size()); +#endif // OHOS_BUILD_ENABLE_RUST } bool StreamBuffer::Read(char *buf, size_t size) { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferReadChar(streamBufferPtr_.get(), buf, size); +#else if (ChkRWError()) { return false; } @@ -114,10 +135,15 @@ bool StreamBuffer::Read(char *buf, size_t size) rPos_ += size; ++rCount_; return true; +#endif // OHOS_BUILD_ENABLE_RUST + } bool StreamBuffer::Write(const char *buf, size_t size) { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferWriteChar(streamBufferPtr_.get(), buf, size); +#else if (ChkRWError()) { return false; } @@ -146,6 +172,43 @@ bool StreamBuffer::Write(const char *buf, size_t size) wPos_ += size; ++wCount_; return true; +#endif // OHOS_BUILD_ENABLE_RUST +} + +const char *StreamBuffer::ReadBuf() const +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferReadBuf(streamBufferPtr_.get()); +#else + return &szBuff_[rPos_]; +#endif // OHOS_BUILD_ENABLE_RUST +} + +bool StreamBuffer::Clone(const StreamBuffer &buf) +{ + Clean(); +#ifdef OHOS_BUILD_ENABLE_RUST + return Write(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get())); +#else + return Write(buf.Data(), buf.Size()); +#endif // OHOS_BUILD_ENABLE_RUST +} +#ifndef OHOS_BUILD_ENABLE_RUST + +bool StreamBuffer::ChkRWError() const +{ + return (rwErrorStatus_ != ErrorStatus::ERROR_STATUS_OK); +} + +bool StreamBuffer::SeekReadPos(size_t n) +{ + size_t pos = rPos_ + n; + if (pos > wPos_) { + SEN_HILOGE("The position in the calculation is not as expected. pos:%{public}zu, [0, %{public}zu]", pos, wPos_); + return false; + } + rPos_ = pos; + return true; } bool StreamBuffer::IsEmpty() const @@ -155,7 +218,11 @@ bool StreamBuffer::IsEmpty() const size_t StreamBuffer::Size() const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferSize(&rustStreamBuffer_); +#else return wPos_; +#endif // OHOS_BUILD_ENABLE_RUST } size_t StreamBuffer::UnreadSize() const @@ -168,13 +235,11 @@ size_t StreamBuffer::GetAvailableBufSize() const return ((wPos_ >= MAX_STREAM_BUF_SIZE) ? 0 : (MAX_STREAM_BUF_SIZE - wPos_)); } -bool StreamBuffer::ChkRWError() const -{ - return (rwErrorStatus_ != ErrorStatus::ERROR_STATUS_OK); -} - const std::string &StreamBuffer::GetErrorStatusRemark() const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferGetErrorStatusRemark(streamBufferPtr_.get()); +#else static const std::vector> remark { {ErrorStatus::ERROR_STATUS_OK, "OK"}, {ErrorStatus::ERROR_STATUS_READ, "READ_ERROR"}, @@ -185,27 +250,22 @@ const std::string &StreamBuffer::GetErrorStatusRemark() const return (item.first == rwErrorStatus_); }); return (tIter != remark.cend() ? tIter->second : invalidStatus); +#endif // OHOS_BUILD_ENABLE_RUST } const char *StreamBuffer::Data() const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamBufferData(&rustStreamBuffer_); +#else return &szBuff_[0]; -} - -const char *StreamBuffer::ReadBuf() const -{ - return &szBuff_[rPos_]; +#endif // OHOS_BUILD_ENABLE_RUST } const char *StreamBuffer::WriteBuf() const { return &szBuff_[wPos_]; } - -bool StreamBuffer::Clone(const StreamBuffer &buf) -{ - Clean(); - return Write(buf.Data(), buf.Size()); -} +#endif // OHOS_BUILD_ENABLE_RUST } // namespace Sensors } // namespace OHOS diff --git a/utils/ipc/src/stream_session.cpp b/utils/ipc/src/stream_session.cpp index 4c0318db311afef0e9255b968acaffc152793fbf..e889e59424427c7f2c88caf046cdd7667fe9b828 100644 --- a/utils/ipc/src/stream_session.cpp +++ b/utils/ipc/src/stream_session.cpp @@ -34,16 +34,30 @@ constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, SENSOR_LOG_DOMAIN, "St } StreamSession::StreamSession(const std::string &programName, const int32_t fd, const int32_t uid, const int32_t pid) - : programName_(programName), + : programName_(programName) +#ifdef OHOS_BUILD_ENABLE_RUST +{ + StreamSessionSetFd(streamSessionPtr_.get(), fd); + StreamSessionSetUid(streamSessionPtr_.get(), uid); + StreamSessionSetPid(streamSessionPtr_.get(), pid); + UpdateDescript(); +} +#else +, fd_(fd), uid_(uid), pid_(pid) { UpdateDescript(); } +#endif // OHOS_BUILD_ENABLE_RUST + bool StreamSession::SendMsg(const char *buf, size_t size) const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSessionSendMsg(streamSessionPtr_.get(), buf, size); +#else CHKPF(buf); if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) { SEN_HILOGE("Buf size:%{public}zu", size); @@ -61,7 +75,11 @@ bool StreamSession::SendMsg(const char *buf, size_t size) const auto count = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL); if (count < 0) { if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) { +#ifdef OHOS_BUILD_ENABLE_RUST + sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); +#else usleep(SEND_RETRY_SLEEP_TIME); +#endif SEN_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno); continue; } @@ -71,7 +89,11 @@ bool StreamSession::SendMsg(const char *buf, size_t size) const idx += static_cast(count); remSize -= static_cast(count); if (remSize > 0) { +#ifdef OHOS_BUILD_ENABLE_RUST + sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME)); +#else usleep(SEND_RETRY_SLEEP_TIME); +#endif } } if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) { @@ -80,19 +102,37 @@ bool StreamSession::SendMsg(const char *buf, size_t size) const return false; } return true; +#endif // OHOS_BUILD_ENABLE_RUST } void StreamSession::Close() { +#ifdef OHOS_BUILD_ENABLE_RUST + StreamSessionClose(streamSessionPtr_.get()); + UpdateDescript(); +#else if (fd_ >= 0) { close(fd_); fd_ = -1; UpdateDescript(); } +#endif // OHOS_BUILD_ENABLE_RUST } void StreamSession::UpdateDescript() { +#ifdef OHOS_BUILD_ENABLE_RUST + std::ostringstream oss; + oss << "fd = " << StreamSessionGetFd(streamSessionPtr_.get()) + << ", programName = " << programName_ + << ", moduleType = " << StreamSessionGetModuleType(streamSessionPtr_.get()) + << ((StreamSessionGetFd(streamSessionPtr_.get()) < 0) ? ", closed" : ", opened") + << ", uid = " << StreamSessionGetUid(streamSessionPtr_.get()) + << ", pid = " << StreamSessionGetPid(streamSessionPtr_.get()) + << ", tokenType = " << StreamSessionGetTokenType(streamSessionPtr_.get()) + << std::endl; + descript_ = oss.str().c_str(); +#else std::ostringstream oss; oss << "fd = " << fd_ << ", programName = " << programName_ @@ -102,10 +142,20 @@ void StreamSession::UpdateDescript() << ", tokenType = " << tokenType_ << std::endl; descript_ = oss.str().c_str(); +#endif // OHOS_BUILD_ENABLE_RUST } bool StreamSession::SendMsg(const NetPacket &pkt) const { +#ifdef OHOS_BUILD_ENABLE_RUST + if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) { + SEN_HILOGE("Read and write status is error"); + return false; + } + StreamBuffer buf; + pkt.MakeData(buf); + return SendMsg(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get())); +#else if (pkt.ChkRWError()) { SEN_HILOGE("Read and write status failed"); return false; @@ -113,16 +163,25 @@ bool StreamSession::SendMsg(const NetPacket &pkt) const StreamBuffer buf; pkt.MakeData(buf); return SendMsg(buf.Data(), buf.Size()); +#endif // OHOS_BUILD_ENABLE_RUST } int32_t StreamSession::GetUid() const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSessionGetUid(streamSessionPtr_.get()); +#else return uid_; +#endif // OHOS_BUILD_ENABLE_RUST } int32_t StreamSession::GetPid() const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSessionGetPid(streamSessionPtr_.get()); +#else return pid_; +#endif // OHOS_BUILD_ENABLE_RUST } SessionPtr StreamSession::GetSharedPtr() @@ -132,7 +191,11 @@ SessionPtr StreamSession::GetSharedPtr() int32_t StreamSession::GetFd() const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSessionGetFd(streamSessionPtr_.get()); +#else return fd_; +#endif // OHOS_BUILD_ENABLE_RUST } const std::string& StreamSession::GetDescript() const @@ -147,12 +210,20 @@ const std::string StreamSession::GetProgramName() const void StreamSession::SetTokenType(int32_t type) { +#ifdef OHOS_BUILD_ENABLE_RUST + StreamSessionSetTokenType(streamSessionPtr_.get(), type); +#else tokenType_ = type; +#endif // OHOS_BUILD_ENABLE_RUST } int32_t StreamSession::GetTokenType() const { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSessionGetTokenType(streamSessionPtr_.get()); +#else return tokenType_; +#endif // OHOS_BUILD_ENABLE_RUST } } // namespace Sensors } // namespace OHOS \ No newline at end of file diff --git a/utils/ipc/src/stream_socket.cpp b/utils/ipc/src/stream_socket.cpp index 9dd64d540a5f11252d2f7119001b5f464d3f4c5d..768b41528b46ef8e0ce95c674f17a5376a64625b 100644 --- a/utils/ipc/src/stream_socket.cpp +++ b/utils/ipc/src/stream_socket.cpp @@ -21,20 +21,30 @@ namespace OHOS { namespace Sensors { +#ifndef OHOS_BUILD_ENABLE_RUST namespace { constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, SENSOR_LOG_DOMAIN, "StreamSocket" }; } // namespace +#endif // OHOS_BUILD_ENABLE_RUST StreamSocket::StreamSocket() {} StreamSocket::~StreamSocket() { +#ifdef OHOS_BUILD_ENABLE_RUST + StreamSocketClose(streamSocketPtr_.get()); + StreamSocketEpollClose(streamSocketPtr_.get()); +#else Close(); EpollClose(); +#endif // OHOS_BUILD_ENABLE_RUST } int32_t StreamSocket::EpollCreate(int32_t size) { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSocketEpollCreate(streamSocketPtr_.get(), size); +#else epollFd_ = epoll_create(size); if (epollFd_ < 0) { SEN_HILOGE("Epoll create, epollFd_:%{public}d", epollFd_); @@ -42,10 +52,15 @@ int32_t StreamSocket::EpollCreate(int32_t size) SEN_HILOGI("Epoll already create, epollFd_:%{public}d", epollFd_); } return epollFd_; +#endif // OHOS_BUILD_ENABLE_RUST + } int32_t StreamSocket::EpollCtl(int32_t fd, int32_t op, struct epoll_event &event, int32_t epollFd) { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSocketEpollCtl(streamSocketPtr_.get(), fd, op, &event, epollFd); +#else if (fd < 0) { SEN_HILOGE("Invalid fd"); return ERROR; @@ -68,10 +83,14 @@ int32_t StreamSocket::EpollCtl(int32_t fd, int32_t op, struct epoll_event &event ret, epollFd, op, fd, errno); } return ret; +#endif // OHOS_BUILD_ENABLE_RUST } int32_t StreamSocket::EpollWait(struct epoll_event &events, int32_t maxevents, int32_t timeout, int32_t epollFd) { +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSocketEpollWait(streamSocketPtr_.get(), &events, maxevents, timeout, epollFd); +#else if (epollFd < 0) { epollFd = epollFd_; } @@ -84,8 +103,54 @@ int32_t StreamSocket::EpollWait(struct epoll_event &events, int32_t maxevents, i SEN_HILOGE("Epoll_wait ret:%{public}d, errno:%{public}d", ret, errno); } return ret; +#endif // OHOS_BUILD_ENABLE_RUST +} + +void StreamSocket::EpollClose() +{ +#ifdef OHOS_BUILD_ENABLE_RUST + StreamSocketEpollClose(streamSocketPtr_.get()); +#else + if (epollFd_ >= 0) { + close(epollFd_); + epollFd_ = -1; + } +#endif // OHOS_BUILD_ENABLE_RUST +} + +void StreamSocket::Close() +{ +#ifdef OHOS_BUILD_ENABLE_RUST + StreamSocketClose(streamSocketPtr_.get()); +#else + if (fd_ >= 0) { + auto rf = close(fd_); + if (rf != 0) { + SEN_HILOGE("Socket close failed, rf:%{public}d", rf); + } + } + fd_ = -1; +#endif // OHOS_BUILD_ENABLE_RUST +} + +int32_t StreamSocket::GetFd() const +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSocketGetFd(streamSocketPtr_.get()); +#else + return fd_; +#endif // OHOS_BUILD_ENABLE_RUST +} +int32_t StreamSocket::GetEpollFd() const +{ +#ifdef OHOS_BUILD_ENABLE_RUST + return StreamSocketGetEpollFd(streamSocketPtr_.get()); +#else + return epollFd_; +#endif // OHOS_BUILD_ENABLE_RUST } +#ifndef OHOS_BUILD_ENABLE_RUST void StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::PacketCallBackFun callbackFun) { constexpr size_t headSize = sizeof(PackHead); @@ -127,33 +192,7 @@ void StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::Pack } } } +#endif // OHOS_BUILD_ENABLE_RUST -void StreamSocket::EpollClose() -{ - if (epollFd_ >= 0) { - close(epollFd_); - epollFd_ = -1; - } -} - -void StreamSocket::Close() -{ - if (fd_ >= 0) { - auto rf = close(fd_); - if (rf != 0) { - SEN_HILOGE("Socket close failed, rf:%{public}d", rf); - } - } - fd_ = -1; -} - -int32_t StreamSocket::GetFd() const -{ - return fd_; -} -int32_t StreamSocket::GetEpollFd() const -{ - return epollFd_; -} } // namespace Sensors } // namespace OHOS \ No newline at end of file