diff --git a/interfaces/kits/hyperaio/include/hyperaio.h b/interfaces/kits/hyperaio/include/hyperaio.h index f7f64104cec9fa78cd531fc15862354d3572c36a..5ee0b2478214a7002f271c1da866a2acfc2df094 100644 --- a/interfaces/kits/hyperaio/include/hyperaio.h +++ b/interfaces/kits/hyperaio/include/hyperaio.h @@ -70,6 +70,7 @@ struct IoResponse { uint64_t userData; int32_t res; uint32_t flags; + IoResponse() = default; IoResponse(uint64_t userData, int32_t res, uint32_t flags) : userData(userData), res(res), @@ -94,9 +95,12 @@ private: DECLARE_PIMPL(HyperAio); ProcessIoResultCallBack ioResultCallBack_ = nullptr; std::thread harvestThread_; + std::thread harvestThread2_; std::atomic stopThread_ = true; + std::atomic stopThread2_ = true; std::atomic initialized_ = false; void HarvestRes(); + void HarvestRes2(); }; } } diff --git a/interfaces/kits/hyperaio/src/hyperaio.cpp b/interfaces/kits/hyperaio/src/hyperaio.cpp index 1f5dc006db5daa9b77efbde83ea1972fc9105c87..6bf89623734d8195d933466e4ffae834b981d9a3 100644 --- a/interfaces/kits/hyperaio/src/hyperaio.cpp +++ b/interfaces/kits/hyperaio/src/hyperaio.cpp @@ -38,6 +38,7 @@ std::atomic cqeCount_{0}; class HyperAio::Impl { public: io_uring uring_; + io_uring uring2_; }; static bool HasAccessIouringPermission() @@ -105,10 +106,18 @@ int32_t HyperAio::CtxInit(ProcessIoResultCallBack *callBack) HILOGE("init io_uring failed, ret = %{public}d", ret); return ret; } + ret = io_uring_queue_init(URING_QUEUE_SIZE, &pImpl_->uring2_, 0); + if (ret < 0) { + io_uring_queue_exit(&pImpl_->uring_); + HILOGE("init io_uring failed, ret = %{public}d", ret); + return ret; + } ioResultCallBack_ = *callBack; stopThread_.store(false); + stopThread2_.store(false); harvestThread_ = std::thread(&HyperAio::HarvestRes, this); + harvestThread2_ = std::thread(&HyperAio::HarvestRes2, this); initialized_.store(true); HILOGI("init hyperaio success"); return EOK; @@ -181,7 +190,7 @@ int32_t HyperAio::StartReadReqs(ReadReqs *req) uint32_t totalReqs = req->reqNum; uint32_t count = 0; for (uint32_t i = 0; i < totalReqs; i++) { - struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_); + struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring2_); if (sqe == nullptr) { HILOGE("get sqe failed"); return -ENOMEM; @@ -195,7 +204,7 @@ int32_t HyperAio::StartReadReqs(ReadReqs *req) + "userData:" + std::to_string(readInfo->userData)); count++; if (count >= BATCH_SIZE || i == totalReqs - 1) { - int32_t ret = io_uring_submit(&pImpl_->uring_); + int32_t ret = io_uring_submit(&pImpl_->uring2_); if (ret < 0) { HILOGE("submit read reqs failed, ret = %{public}d", ret); return ret; @@ -259,9 +268,13 @@ void HyperAio::HarvestRes() HILOGI("pImpl is null"); return; } - - while (!stopThread_.load()) { - struct io_uring_cqe *cqe; + struct io_uring_cqe *cqe; + while (1) { + if (stopThread_.load()) { + break; + } + std::vector ioResponseArray; + cqe = nullptr; int32_t ret = io_uring_wait_cqe(&pImpl_->uring_, &cqe); if (ret < 0 || cqe == nullptr) { HILOGI("wait cqe failed, ret = %{public}d", ret); @@ -269,7 +282,7 @@ void HyperAio::HarvestRes() } cqeCount_++; if (cqe->res < 0) { - HILOGI("cqe failed, cqe->res = %{public}d", cqe->res); + HILOGE("cqe failed, cqe->res = %{public}d", cqe->res); } auto response = std::make_unique(cqe->user_data, cqe->res, cqe->flags); HyperaioTrace trace("harvest: userdata " + std::to_string(cqe->user_data) @@ -282,6 +295,33 @@ void HyperAio::HarvestRes() HILOGI("exit harvest thread"); } +void HyperAio::HarvestRes2() +{ + if (pImpl_ == nullptr) { + HILOGI("pImpl is null"); + return; + } + struct io_uring_cqe *cqe; + while (1) { + if (stopThread2_.load()) { + break; + } + cqe = nullptr; + int32_t ret = io_uring_wait_cqe(&pImpl_->uring2_, &cqe); + if (ret < 0 || cqe == nullptr) { + HILOGI("wait cqe failed, ret = %{public}d", ret); + continue; + } + auto response = std::make_unique(cqe->user_data, cqe->res, cqe->flags); + HyperaioTrace trace("harvest2: userdata " + std::to_string(cqe->user_data) + + " res " + std::to_string(cqe->res) + "flags " + std::to_string(cqe->flags)); + io_uring_cqe_seen(&pImpl_->uring2_, cqe); + if (ioResultCallBack_) { + ioResultCallBack_(std::move(response)); + } + } +} + int32_t HyperAio::DestroyCtx() { HILOGI("openReqCount = %{public}u, readReqCount = %{public}u, cancelReqCount = %{public}u, cqeCount = %{public}u", @@ -292,11 +332,13 @@ int32_t HyperAio::DestroyCtx() } stopThread_.store(true); - if (harvestThread_.joinable()) { + stopThread2_.store(true); + if (harvestThread_.joinable() && harvestThread2_.joinable()) { HILOGI("start harvest thread join"); harvestThread_.join(); - // This log is only printed after join() completes successfully - HILOGI("join success"); + HILOGI("join thread1 success"); + harvestThread2_.join(); + HILOGI("join thread2 success"); } if (pImpl_ != nullptr) {