diff --git a/interfaces/kits/hyperaio/include/hyperaio.h b/interfaces/kits/hyperaio/include/hyperaio.h index f7f64104cec9fa78cd531fc15862354d3572c36a..3b328aa76d598a35a503de9e35ad8b2f44b8de7d 100644 --- a/interfaces/kits/hyperaio/include/hyperaio.h +++ b/interfaces/kits/hyperaio/include/hyperaio.h @@ -70,6 +70,7 @@ struct IoResponse { uint64_t userData; int32_t res; uint32_t flags; + IoResponse() = default; IoResponse(uint64_t userData, int32_t res, uint32_t flags) : userData(userData), res(res), @@ -83,7 +84,7 @@ struct IoResponse { class HyperAio { public: - using ProcessIoResultCallBack = std::function)>; + using ProcessIoResultCallBack = std::function)>; uint32_t SupportIouring(); int32_t CtxInit(ProcessIoResultCallBack *callBack); int32_t StartReadReqs(ReadReqs *req); @@ -94,9 +95,13 @@ private: DECLARE_PIMPL(HyperAio); ProcessIoResultCallBack ioResultCallBack_ = nullptr; std::thread harvestThread_; + std::thread harvestThread2_; std::atomic stopThread_ = true; + std::atomic stopThread2_ = true; std::atomic initialized_ = false; void HarvestRes(); + void HarvestRes2(); + void HandleError(std::vector &errorVec); }; } } diff --git a/interfaces/kits/hyperaio/src/hyperaio.cpp b/interfaces/kits/hyperaio/src/hyperaio.cpp index 1f5dc006db5daa9b77efbde83ea1972fc9105c87..8b2c76c1864ba16880ee3ab2ce0d07dacf4bbc59 100644 --- a/interfaces/kits/hyperaio/src/hyperaio.cpp +++ b/interfaces/kits/hyperaio/src/hyperaio.cpp @@ -38,6 +38,7 @@ std::atomic cqeCount_{0}; class HyperAio::Impl { public: io_uring uring_; + io_uring uring2_; }; static bool HasAccessIouringPermission() @@ -58,6 +59,24 @@ static bool ValidateReqNum(uint32_t reqNum) return reqNum > 0 && reqNum <= URING_QUEUE_SIZE - 1; } +/* + * Returns -erron on error, or zero on success. On success 'ring' + * contains the necessary information to read/write to the rings. + */ + +int io_uring_queue_init_with_iowq(unsigned entries, struct io_uring *ring, + unsigned flags, unsigned iowq_cnt) +{ + struct io_uring_params p; + if (iowq_cnt > 16) { + return -EINVAL; + } + memset(&p, 0, sizeof(p)); + p.flags = flags; + p.resv[0] = iowq_cnt; + return io_uring_queue_init_params(entries, ring, &p); +} + uint32_t HyperAio::SupportIouring() { HyperaioTrace trace("SupportIouring"); @@ -100,20 +119,41 @@ int32_t HyperAio::CtxInit(ProcessIoResultCallBack *callBack) pImpl_ = std::make_shared(); } - int32_t ret = io_uring_queue_init(URING_QUEUE_SIZE, &pImpl_->uring_, 0); + int32_t ret = io_uring_queue_init_with_iowq(URING_QUEUE_SIZE, &pImpl_->uring_, 0, 4); + if (ret < 0) { + HILOGE("init io_uring failed, ret = %{public}d", ret); + return ret; + } + ret = io_uring_queue_init_with_iowq(URING_QUEUE_SIZE, &pImpl_->uring2_, 0, 4); if (ret < 0) { + io_uring_queue_exit(&pImpl_->uring_); HILOGE("init io_uring failed, ret = %{public}d", ret); return ret; } ioResultCallBack_ = *callBack; stopThread_.store(false); + stopThread2_.store(false); harvestThread_ = std::thread(&HyperAio::HarvestRes, this); + harvestThread_ = std::thread(&HyperAio::HarvestRes2, this); initialized_.store(true); HILOGI("init hyperaio success"); return EOK; } +void HyperAio::HandleError(std::vector &errorVec) +{ + std::vector ioResponseArray; + for (auto userdata : errorVec) { + IoResponse ioResponse(userdata, -EBUSY, 0); + ioResponseArray.push_back(ioResponse); + } + if (ioResultCallBack_) { + ioResultCallBack_(ioResponseArray); + } + errorVec.clear(); +} + int32_t HyperAio::StartOpenReqs(OpenReqs *req) { if (pImpl_ == nullptr) { @@ -133,6 +173,7 @@ int32_t HyperAio::StartOpenReqs(OpenReqs *req) HyperaioTrace trace("StartOpenReqs" + std::to_string(req->reqNum)); uint32_t totalReqs = req->reqNum; uint32_t count = 0; + std::vector openInfoVec; for (uint32_t i = 0; i < totalReqs; i++) { struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_); if (sqe == nullptr) { @@ -148,10 +189,12 @@ int32_t HyperAio::StartOpenReqs(OpenReqs *req) HyperaioTrace trace("open flags:" + std::to_string(openInfo->flags) + "mode:" + std::to_string(openInfo->mode) + "userData:" + std::to_string(openInfo->userData)); count++; + openInfoVec.push_back(openInfo->userData); if (count >= BATCH_SIZE || i == totalReqs - 1) { int32_t ret = io_uring_submit(&pImpl_->uring_); if (ret < 0) { HILOGE("submit open reqs failed, ret = %{public}d", ret); + HandleError(openInfoVec); return ret; } openReqCount_ += count; @@ -180,8 +223,9 @@ int32_t HyperAio::StartReadReqs(ReadReqs *req) HyperaioTrace trace("StartReadReqs" + std::to_string(req->reqNum)); uint32_t totalReqs = req->reqNum; uint32_t count = 0; + std::vector readInfoVec; for (uint32_t i = 0; i < totalReqs; i++) { - struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_); + struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring2_); if (sqe == nullptr) { HILOGE("get sqe failed"); return -ENOMEM; @@ -194,10 +238,12 @@ int32_t HyperAio::StartReadReqs(ReadReqs *req) HyperaioTrace trace("read len:" + std::to_string(readInfo->len) + "offset:" + std::to_string(readInfo->offset) + "userData:" + std::to_string(readInfo->userData)); count++; + readInfoVec.push_back(readInfo->userData); if (count >= BATCH_SIZE || i == totalReqs - 1) { - int32_t ret = io_uring_submit(&pImpl_->uring_); + int32_t ret = io_uring_submit(&pImpl_->uring2_); if (ret < 0) { HILOGE("submit read reqs failed, ret = %{public}d", ret); + HandleError(readInfoVec); return ret; } readReqCount_ += count; @@ -259,9 +305,13 @@ void HyperAio::HarvestRes() HILOGI("pImpl is null"); return; } - - while (!stopThread_.load()) { - struct io_uring_cqe *cqe; + struct io_uring_cqe *cqe; + while (1) { + if (stopThread_.load()) { + break; + } + std::vector ioResponseArray; + cqe = nullptr; int32_t ret = io_uring_wait_cqe(&pImpl_->uring_, &cqe); if (ret < 0 || cqe == nullptr) { HILOGI("wait cqe failed, ret = %{public}d", ret); @@ -269,19 +319,60 @@ void HyperAio::HarvestRes() } cqeCount_++; if (cqe->res < 0) { - HILOGI("cqe failed, cqe->res = %{public}d", cqe->res); + HILOGE("cqe failed, cqe->res = %{public}d", cqe->res); } - auto response = std::make_unique(cqe->user_data, cqe->res, cqe->flags); HyperaioTrace trace("harvest: userdata " + std::to_string(cqe->user_data) + " res " + std::to_string(cqe->res) + "flags " + std::to_string(cqe->flags)); + IoResponse ioResponse(cqe->user_data, cqe->res, cqe->flags); + ioResponseArray.push_back(ioResponse); io_uring_cqe_seen(&pImpl_->uring_, cqe); + while (1) { + ret = io_uring_peek_cqe(&pImpl_->uring, &cqe); + if (ret == -EAGAIN) { + break; + } + IoResponse ioResponse2(cqe->user_data, cqe->res, cqe->flags); + ioResponseArray.push_back(ioResponse2); + io_uring_cqe_seen(&pImpl_->uring_, cqe); + } if (ioResultCallBack_) { - ioResultCallBack_(std::move(response)); + ioResultCallBack_(ioResponseArray); } } HILOGI("exit harvest thread"); } +void HyeprAio::HarvestRes2() +{ + if (pImpl_ == nullptr) { + HILOGI("pImpl is null"); + return; + } + int32_t ret = OHOS::QOS::SetThreaQos(OHOS::QOS::QosLevel::QOS_USER_INTERACTIVE); + if (ret < 0) { + HILOGE("Harvest SetThreadQos failed ret = %{public}d", ret); + } + struct io_uring_cqe *cqe; + while (1) { + if (stopThread2_.load()) { + break; + } + cqe = nullptr; + int32_t ret = io_uring_wait_cqe(&pImpl_->uring2_, &cqe); + if (ret < 0 || cqe == nullptr) { + HILOGI("wait cqe failed, ret = %{public}d", ret); + continue; + } + IoResponse ioResponse(cqe->user_data, cqe->res, cqe->flags); + std::vector ioResponseArray; + ioResponseArray.push_back(ioResponse); + io_uring_cqe_seen(&pImpl_->uring2_, cqe); + if (ioResultCallBack_) { + ioResultCallBack_(ioResponseArray); + } + } +} + int32_t HyperAio::DestroyCtx() { HILOGI("openReqCount = %{public}u, readReqCount = %{public}u, cancelReqCount = %{public}u, cqeCount = %{public}u", @@ -292,11 +383,13 @@ int32_t HyperAio::DestroyCtx() } stopThread_.store(true); - if (harvestThread_.joinable()) { + stopThread2_.store(true); + if (harvestThread_.joinable() && harvestThread2_.joinable()) { HILOGI("start harvest thread join"); harvestThread_.join(); - // This log is only printed after join() completes successfully - HILOGI("join success"); + HILOGI("join thread1 success"); + harvestThread2_.join(); + HILOGI("join thread2 success"); } if (pImpl_ != nullptr) {