diff --git a/official/gnn/bgcf/export.py b/official/gnn/bgcf/export.py index 2f0acafe379fea179ae732bfeccdfe88da781906..c75003b420f0769a390bdb0696e57f8345b7e2a9 100644 --- a/official/gnn/bgcf/export.py +++ b/official/gnn/bgcf/export.py @@ -52,13 +52,13 @@ def run_export(): forward_net = ForwardBGCF(network) - users = Tensor(np.zeros([num_user,]).astype(np.int32)) - items = Tensor(np.zeros([num_item,]).astype(np.int32)) - neg_items = Tensor(np.zeros([num_item, 1]).astype(np.int32)) - u_test_neighs = Tensor(np.zeros([num_user, config.row_neighs]).astype(np.int32)) - u_test_gnew_neighs = Tensor(np.zeros([num_user, config.gnew_neighs]).astype(np.int32)) - i_test_neighs = Tensor(np.zeros([num_item, config.row_neighs]).astype(np.int32)) - i_test_gnew_neighs = Tensor(np.zeros([num_item, config.gnew_neighs]).astype(np.int32)) + users = Tensor(np.zeros([1,num_user]).astype(np.int32)) + items = Tensor(np.zeros([1,num_item]).astype(np.int32)) + neg_items = Tensor(np.zeros([1,num_item]).astype(np.int32)) + u_test_neighs = Tensor(np.zeros([1,num_user*config.row_neighs]).astype(np.int32)) + u_test_gnew_neighs = Tensor(np.zeros([1,num_user* config.gnew_neighs]).astype(np.int32)) + i_test_neighs = Tensor(np.zeros([1,num_item*config.row_neighs]).astype(np.int32)) + i_test_gnew_neighs = Tensor(np.zeros([1,num_item* config.gnew_neighs]).astype(np.int32)) input_data = [users, items, neg_items, u_test_neighs, u_test_gnew_neighs, i_test_neighs, i_test_gnew_neighs] export(forward_net, *input_data, file_name=config.file_name, file_format=config.file_format) diff --git a/official/gnn/bgcf/om_infer/README.md b/official/gnn/bgcf/om_infer/README.md new file mode 100644 index 0000000000000000000000000000000000000000..0cd3e54b3d7184165668f8a19cc6b84c7042eda7 --- /dev/null +++ b/official/gnn/bgcf/om_infer/README.md @@ -0,0 +1,323 @@ +# 离线推理过程 + +## 准备容器环境 + +1、将源代码(om_infer)上传至服务器任意目录(如:/home/data/),并进入该目录。 + +源码目录结构如下图所示: + +``` +. +|-- README.md # 代码说明 +|-- convert # AIR to OM 转换脚本目录 +| |-- bgcf.air +| |-- bgcf.om +| |-- convert_bgcf_om.sh +| |-- fusion_result.json +| `-- kernel_meta +|-- data # 数据/参数管理目录 +| |-- config +| | `-- bgcf.pipeline +| |-- eval # 模型精度验证数据 +| | |-- item_deg_dict.pkl +| | |-- item_full_set.pkl +| | |-- test_inputs.pkl +| | |-- test_set.pkl +| | `-- train_set.pkl +| `-- model # OM模型文件 +| `-- bgcf.om +`-- infer + |-- mxbase # mxbase推理目录 + | |-- main.cpp + | |-- predata.py + | `-- run.sh + |-- output # mxbase推理结果 + | |-- mxbase_item_rep.txt + | `-- mxbase_user_rep.txt + `-- sdk + |-- eval.py # sdk 精度验证脚本 + |-- eval.sh + |-- infer.py # sdk推理脚本 + `-- infer.sh + +``` + +2、下载SDK安装包,将其上传至infer文件夹内(如/home/data/bgcf_mindspore_1.2.0_code/om_infer/) + +- SDK版本号:2.0.1 + +- 下载路径:[mxManufacture-昇腾社区 (hiascend.com)](https://www.hiascend.com/software/mindx-sdk/sdk-detail)。下载后检查文档是否具备可执行属性,如不具备则执行chmod命令增加执行权限。 + + Dockerfile文件内容: + + ~~~ dockerfile + ARG FROM_IMAGE_NAME + #基础镜像 + FROM $FROM_IMAGE_NAME + ARG SDK_PKG + #将SDK安装包拷贝到镜像中 + COPY ./$SDK_PKG . + #安装SDK + RUN ./$SDK_PKG --install + #使环境变量生效 + RUN /bin/bash -c "source ~/.bashrc" + ~~~ + + + +3、编译推理镜像 + +``` +#非root权限,需在指令前面加"sudo" +docker build -t infer_image --build-arg FROM_IMAGE_NAME=base_image:tag --build-arg SDK_PKG=sdk_pkg . +``` + +| 参数 | 说明 | +| ------------- | ------------------------------------------------------------ | +| *infer_image* | 推理镜像名称,根据需求写入。 | +| *base_image* | 基础镜像,可从[Ascend Hub](https://ascendhub.huawei.com/public-ascendhub/ascend-infer-x86)上下载 | +| *tag* | 镜像tag,请根据实际配置,如:21.0.1。 | +| *sdk_pkg* | 下载的mxManufacture包名称,如 Ascend-mindxsdk-mxmanufacture_*{version}*_linux-*{arch}*.run | + +注:指令末尾的”.“一定不能省略,这代表当前目录 + +4、启动容器 + +执行以下命令,启动容器实例。 + +``` +bash docker_start_infer.sh docker_image:tag model_dir +``` + +| 参数 | 说明 | +| -------------- | -------------------------------------------- | +| *docker_image* | 推理镜像名称,推理镜像请从Ascend Hub上下载。 | +| *tag* | 镜像tag,请根据实际配置,如:21.0.1。 | +| *model_dir* | 推理容器挂载路径,本例中为/home/data | + +启动容器时会将推理芯片和数据路径挂载到容器中。 + +其中docker_start_infer.sh(/infer/docker_start_infer.sh)内容如下。 + +docker_start_infer.sh文件内容 + +```shell +#!/bin/bash +docker_image=$1 +model_dir=$2 + +if [ -z "${docker_image}" ]; then + echo "please input docker_image" + exit 1 +fi + +if [ ! -d "${model_dir}" ]; then + echo "please input model_dir" + exit 1 +fi + +docker run -it \ + --device=/dev/davinci0 \ #请根据芯片的情况更改 + --device=/dev/davinci_manager \ + --device=/dev/devmm_svm \ + --device=/dev/hisi_hdc \ + -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \ + -v ${model_dir}:${model_dir} \ + ${docker_image} \ + /bin/bash +``` + + +## 预处理操作 + +1、模型转换 +将air模型放入 ./convert目录下,执行转换脚本 bash convert_om.sh xxx.air xx。得到对应的om模型. + +convert_om.sh文件内容 +``` +#!/bin/bash + +if [ $# -ne 2 ] +then + echo "Wrong parameter format." + echo "Usage:" + echo " bash $0 [INPUT_AIR_PATH] [OUTPUT_OM_PATH_NAME]" + echo "Example: " + echo " bash convert_om.sh xxx.air xx" + + exit 1 +fi + +input_air_path=$1 +output_om_path=$2 + + +echo "Input AIR file path: ${input_air_path}" +echo "Output OM file path: ${output_om_path}" + +atc --input_format=NCHW \ + --framework=1 \ + --model="${input_air_path}" \ + --output="${output_om_path}" \ + --soc_version=Ascend310 + +``` + +2、数据预处理 + +执行推理之前,需进行数据集语料预处理。在 modelarts 上执行 eval.py 完成后得到的 *.pkl 文件放入 ./data/eval 目录下。 + +此处生成数据适用于 SDK 和 mxbase 两种推理方式。 + +## MindX SDK推理 +预先准备MindX SDK编排文件存放于 ./data/config 目录,内容如下: +``` +{ + "bgcf_gnn": { + "stream_config": { + "deviceId": "0" + }, + "appsrc0": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:0" + }, + "appsrc1": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:1" + }, + "appsrc2": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:2" + }, + "appsrc3": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:3" + }, + "appsrc4": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:4" + }, + "appsrc5": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:5" + }, + "appsrc6": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:6" + }, + "mxpi_tensorinfer0": { + "props": { + "dataSource":"appsrc0,appsrc1,appsrc2,appsrc3,appsrc4,appsrc5,appsrc6", + "modelPath": "../../data/model/bgcf.om" + }, + "factory": "mxpi_tensorinfer", + "next": "mxpi_dataserialize0" + }, + "mxpi_dataserialize0": { + "props": { + "outputDataKeys": "mxpi_tensorinfer0" + }, + "factory": "mxpi_dataserialize", + "next": "appsink0" + }, + "appsink0": { + "props": { + "blocksize": "4096000" + }, + "factory": "appsink" + } + } +} +``` + +进入 ./infer/sdk 子目录,运行SDK推理脚本开启推理过程。 + +SDK推理命令: +``` + cd sdk + bash eval.sh # 该脚本是 精度验证脚本, 如需使用推理脚本请使用 bash infer.sh +``` +SDK推理结果保存于 ./infer/output/sdk_result 目录下。 推理过程中,推理精度及性能统计结果在屏幕打印输出,如下示例: + +``` +(7068, 192) +(3570, 192) +epoch:600, recall_@10:0.10289, recall_@20:0.15531, ndcg_@10:0.07467, ndcg_@20:0.09250, sedp_@10:0.01945, sedp_@20:0.01550, nov_@10:7.57716, nov_@20:7.79069 + +``` +## mxBase推理 + +推理脚本置于 infer/mxbase目录下。 + +需要先执行predata.py 脚本得到 dataset 文件夹 + +然后执行 run.sh 脚本进行推理 ( 执行前请根据需要更改 MX_SDK_HOME 目录) + +脚本run.sh内容如下: + +``` +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +CUR_PATH=$(cd "$(dirname "$0")" || { warn "Failed to check path/to/run.sh" ; exit ; } ; pwd) + +# Simple log helper functions +info() { echo -e "\033[1;34m[INFO ][MxStream] $1\033[1;37m" ; } +warn() { echo >&2 -e "\033[1;31m[WARN ][MxStream] $1\033[1;37m" ; } + +export MX_SDK_HOME="/home/xxx/mxVision-2.0.2" +export LD_LIBRARY_PATH="${MX_SDK_HOME}/lib":"${MX_SDK_HOME}/opensource/lib":"${MX_SDK_HOME}/opensource/lib64":"/usr/local/Ascend/ascend-toolkit/latest/acllib/lib64":${LD_LIBRARY_PATH} +export GST_PLUGIN_SCANNER="${MX_SDK_HOME}/opensource/libexec/gstreamer-1.0/gst-plugin-scanner" +export GST_PLUGIN_PATH="${MX_SDK_HOME}/opensource/lib/gstreamer-1.0":"${MX_SDK_HOME}/lib/plugins" + +# complie +g++ main.cpp -I "${MX_SDK_HOME}/include/" -I "${MX_SDK_HOME}/opensource/include/" -L "${MX_SDK_HOME}/lib/" -L "${MX_SDK_HOME}/opensource/lib/" -L "${MX_SDK_HOME}/opensource/lib64/" -std=c++11 -D_GLIBCXX_USE_CXX11_ABI=0 -Dgoogle=mindxsdk_private -fPIC -fstack-protector-all -g -Wl,-z,relro,-z,now,-z,noexecstack -pie -Wall -lglog -lmxbase -lmxpidatatype -lplugintoolkit -lstreammanager -lcpprest -lmindxsdk_protobuf -o main + +# run +./main + + +``` + +3、得到的结果会保存在 ./infer/output 目录,得到的结果是由两个矩阵组成的,分别保存为 mxbase_item_rep.txt mxbase_user_rep.txt , + +*精度说明:* + +为更准确反映推理精度,本项目使用gitee网站mindspore r1.3 版本[modelzoo](https://gitee.com/mindspore/mindspore/tree/r1.3/model_zoo/official/nlp/textcnn/)提供的训练代码 ,在910服务器上训练模型,用于本推理过程。 + +由于训练过程 loss 值存在一定抖动,生成模型的 eval.py 输出精度也略微变化。本人操作时冻结的checkpoint文件对应的eval输出精度Recall@20为0.15524. 使用本代码进行SDK推理及mxbase推理时统计精度也为0.15531,与训练模型保持了一致。 + diff --git a/official/gnn/bgcf/om_infer/convert/convert_bgcf_om.sh b/official/gnn/bgcf/om_infer/convert/convert_bgcf_om.sh new file mode 100644 index 0000000000000000000000000000000000000000..5aaee05da0f9f06a36bc6f86041f2fd1bcb1d981 --- /dev/null +++ b/official/gnn/bgcf/om_infer/convert/convert_bgcf_om.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +if [ $# -ne 2 ] +then + echo "Wrong parameter format." + echo "Usage:" + echo " bash $0 [INPUT_AIR_PATH] [OUTPUT_OM_PATH_NAME]" + echo "Example: " + echo " bash convert_om.sh xxx.air xx" + + exit 1 +fi + +input_air_path=$1 +output_om_path=$2 + + +echo "Input AIR file path: ${input_air_path}" +echo "Output OM file path: ${output_om_path}" + +atc --input_format=NCHW \ + --framework=1 \ + --model="${input_air_path}" \ + --output="${output_om_path}" \ + --soc_version=Ascend310 diff --git a/official/gnn/bgcf/om_infer/data/config/bgcf.pipeline b/official/gnn/bgcf/om_infer/data/config/bgcf.pipeline new file mode 100644 index 0000000000000000000000000000000000000000..464683093e5d680127cf42d7dc9b2cdd6a6e2929 --- /dev/null +++ b/official/gnn/bgcf/om_infer/data/config/bgcf.pipeline @@ -0,0 +1,78 @@ +{ + "bgcf_gnn": { + "stream_config": { + "deviceId": "0" + }, + "appsrc0": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:0" + }, + "appsrc1": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:1" + }, + "appsrc2": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:2" + }, + "appsrc3": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:3" + }, + "appsrc4": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:4" + }, + "appsrc5": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:5" + }, + "appsrc6": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:6" + }, + "mxpi_tensorinfer0": { + "props": { + "dataSource":"appsrc0,appsrc1,appsrc2,appsrc3,appsrc4,appsrc5,appsrc6", + "modelPath": "../../data/model/bgcf.om" + }, + "factory": "mxpi_tensorinfer", + "next": "mxpi_dataserialize0" + }, + "mxpi_dataserialize0": { + "props": { + "outputDataKeys": "mxpi_tensorinfer0" + }, + "factory": "mxpi_dataserialize", + "next": "appsink0" + }, + "appsink0": { + "props": { + "blocksize": "4096000" + }, + "factory": "appsink" + } + } +} + diff --git a/official/gnn/bgcf/om_infer/data/eval/.gitkeep b/official/gnn/bgcf/om_infer/data/eval/.gitkeep new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/official/gnn/bgcf/om_infer/data/model/.gitkeep b/official/gnn/bgcf/om_infer/data/model/.gitkeep new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/official/gnn/bgcf/om_infer/infer/mxbase/main.cpp b/official/gnn/bgcf/om_infer/infer/mxbase/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8182dc6998d03f545e17cc0af0fb2a8504b45ad3 --- /dev/null +++ b/official/gnn/bgcf/om_infer/infer/mxbase/main.cpp @@ -0,0 +1,336 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "MxStream/StreamManager/MxStreamManager.h" +#include "MxBase/Log/Log.h" +#include "MxBase/DvppWrapper/DvppWrapper.h" +#include "MxBase/ModelInfer/ModelInferenceProcessor.h" +#include "MxBase/Tensor/TensorContext/TensorContext.h" +#include "MxTools/Proto/MxpiDataType.pb.h" +#include "MxTools/Proto/MxpiDataTypeDeleter.h" +using namespace std; + +#define INT32_BYTELEN 4 +APP_ERROR ReadFile(const std::string &filePath, MxStream::MxstDataInput &dataBuffer) +{ + char c[PATH_MAX + 1] = {0x00}; + size_t count = filePath.copy(c, PATH_MAX + 1); + if (count != filePath.length()) + { + LogError << "Failed to strcpy " << c; + return APP_ERR_COMM_FAILURE; + } + + char path[PATH_MAX + 1] = {0x00}; + if ((strlen(c) > PATH_MAX) || (realpath(c, path) == nullptr)) + { + LogError << "Failed to get image path(" << filePath << ")."; + return APP_ERR_COMM_NO_EXIST; + } + FILE *fp = fopen(path, "rb"); + if (fp == nullptr) + { + LogError << "Failed to open file"; + return APP_ERR_COMM_OPEN_FAIL; + } + + fseek(fp, 0, SEEK_END); + long fileSize = ftell(fp); + fseek(fp, 0, SEEK_SET); + + if (fileSize > 0) + { + dataBuffer.dataSize = fileSize; + dataBuffer.dataPtr = new (std::nothrow) uint32_t[fileSize]; + if (dataBuffer.dataPtr == nullptr) + { + LogError << "allocate memory with \"new uint32_t\" failed."; + return APP_ERR_COMM_FAILURE; + } + uint32_t readRet = fread(dataBuffer.dataPtr, 1, fileSize, fp); + if (readRet <= 0) + { + fclose(fp); + return APP_ERR_COMM_READ_FAIL; + } + fclose(fp); + return APP_ERR_OK; + } + fclose(fp); + return APP_ERR_COMM_FAILURE; +} +std::string ReadPipelineConfig(std::string &pipelineConfigPath) +{ + std::ifstream file(pipelineConfigPath.c_str(), std::ifstream::binary); + if (!file) + { + LogError << pipelineConfigPath << " file is not exists"; + return ""; + } + file.seekg(0, std::ifstream::end); + uint32_t fileSize = file.tellg(); + file.seekg(0); + std::unique_ptr data(new char[fileSize]); + file.read(data.get(), fileSize); + file.close(); + std::string pipelineConfig(data.get(), fileSize); + return pipelineConfig; +} + +APP_ERROR SendEachProtobuf(std::string STREAM_NAME, int inPluginId, std::vector vec, shared_ptr &mxStreamManager) +{ + auto dataSize = vec.size() * INT32_BYTELEN; + auto dataPtr = &vec[0]; + MxBase::MemoryData memorySrc(dataPtr, dataSize, MxBase::MemoryData::MEMORY_HOST_NEW); + MxBase::MemoryData memoryDst(dataSize, MxBase::MemoryData::MEMORY_HOST_NEW); + APP_ERROR ret = MxBase::MemoryHelper::MxbsMallocAndCopy(memoryDst, memorySrc); + if (ret != APP_ERR_OK) + { + LogError << "Fail to malloc and copy host memory."; + return ret; + } + + auto tensorPackageList = std::shared_ptr(new MxTools::MxpiTensorPackageList, MxTools::g_deleteFuncMxpiTensorPackageList); + auto tensorPackage = tensorPackageList->add_tensorpackagevec(); + auto tensorVec = tensorPackage->add_tensorvec(); + tensorVec->set_tensordataptr((uint64_t)memoryDst.ptrData); + tensorVec->set_tensordatasize(dataSize); + tensorVec->set_tensordatatype(MxBase::TENSOR_DTYPE_INT32); + tensorVec->set_memtype(MxTools::MXPI_MEMORY_HOST_NEW); + tensorVec->set_deviceid(0); + tensorVec->add_tensorshape(1); + tensorVec->add_tensorshape(vec.size()); + + MxStream::MxstProtobufIn dataBuffer; + ostringstream dataSource; + dataSource << "appsrc" << inPluginId; + + dataBuffer.key = dataSource.str(); + dataBuffer.messagePtr = static_pointer_cast(tensorPackageList); + vector dataBufferVec; + dataBufferVec.push_back(dataBuffer); + ret = mxStreamManager->SendProtobuf(STREAM_NAME, inPluginId, dataBufferVec); + return ret; +} + +void GetTensors( + const std::shared_ptr &tensorPackageList, + std::vector &tensors) +{ + for (int i = 0; i < tensorPackageList->tensorpackagevec_size(); ++i) + { + for (int j = 0; + j < tensorPackageList->tensorpackagevec(i).tensorvec_size(); j++) + { + MxBase::MemoryData memoryData = {}; + memoryData.deviceId = + tensorPackageList->tensorpackagevec(i).tensorvec(j).deviceid(); + memoryData.type = (MxBase::MemoryData::MemoryType)tensorPackageList + ->tensorpackagevec(i) + .tensorvec(j) + .memtype(); + memoryData.size = (uint32_t)tensorPackageList->tensorpackagevec(i) + .tensorvec(j) + .tensordatasize(); + memoryData.ptrData = (void *)tensorPackageList->tensorpackagevec(i) + .tensorvec(j) + .tensordataptr(); + if (memoryData.type == MxBase::MemoryData::MEMORY_HOST || + memoryData.type == MxBase::MemoryData::MEMORY_HOST_MALLOC || + memoryData.type == MxBase::MemoryData::MEMORY_HOST_NEW) + { + memoryData.deviceId = -1; + } + std::vector outputShape = {}; + for (int k = 0; k < tensorPackageList->tensorpackagevec(i) + .tensorvec(j) + .tensorshape_size(); + ++k) + { + outputShape.push_back( + (uint32_t)tensorPackageList->tensorpackagevec(i) + .tensorvec(j) + .tensorshape(k)); + } + MxBase::TensorBase tmpTensor( + memoryData, true, outputShape, + (MxBase::TensorDataType)tensorPackageList->tensorpackagevec(0) + .tensorvec(j) + .tensordatatype()); + tensors.push_back(tmpTensor); + } + } +} + +static float ShortToFloat(unsigned short x) +{ + float y; + unsigned int temp; + temp = ((unsigned int)(x << 16)); + + y = (float)(*((float *)(&temp))); + return y; +} + +static float half_to_float(unsigned short h) +{ + short *ptr; + int fs, fe, fm, rlt; + + ptr = (short *)&h; + + fs = ((*ptr) & 0x8000) << 16; + + fe = ((*ptr) & 0x7c00) >> 10; + fe = fe + 0x70; + fe = fe << 23; + + fm = ((*ptr) & 0x03ff) << 13; + + rlt = fs | fe | fm; + return *((float *)&rlt); +} + +int main(int argc, char *argv[]) +{ + MxStream::MxstDataInput dataInput; + + std::string pipelineConfigPath = "../../data/config/bgcf.pipeline"; + std::string pipelineConfig = ReadPipelineConfig(pipelineConfigPath); + if (pipelineConfig == "") + { + return APP_ERR_COMM_INIT_FAIL; + } + + auto mxStreamManager = make_shared(); + APP_ERROR ret = mxStreamManager->InitManager(); + if (ret != APP_ERR_OK) + { + LogError << "Failed to init Stream manager, ret = " << ret << "."; + return ret; + } + ret = mxStreamManager->CreateMultipleStreams(pipelineConfig); + if (ret != APP_ERR_OK) + { + LogError << "Failed to create Stream, ret = " << ret << "."; + return ret; + } + + std::shared_ptr objectList = std::make_shared(); + MxStream::MxstProtobufIn dataBuffer; + dataBuffer.key = "appsrc0"; + dataBuffer.messagePtr = std::static_pointer_cast(objectList); + std::vector dataBufferVec; + dataBufferVec.push_back(dataBuffer); + + std::string streamName = "bgcf_gnn"; + int inPluginId = 0; + std::vector keyVec; + keyVec.push_back("mxpi_tensorinfer0"); + + int nums[7] = {7068, 3570, 3570, 282720, 141360, 142800, 71400}; + std::string names[7] = {"./dataset/users.txt", "./dataset/items.txt", "./dataset/neg_items.txt", "./dataset/u_test_neighs.txt", "./dataset/u_test_gnew_neighs.txt", "./dataset/i_test_neighs.txt", "./dataset/i_test_gnew_neighs.txt"}; + for (int i = 0; i < 7; i++) + { + freopen(names[i].c_str(), "r", stdin); + std::vector vec(nums[i]); + for (int j = 0; j < nums[i]; j++) + cin >> vec[j]; + ret = SendEachProtobuf(streamName, i, vec, mxStreamManager); + fclose(stdin); + } + + std::vector output = mxStreamManager->GetProtobuf(streamName, inPluginId, keyVec); + if (output.size() == 0) + { + LogError << "output size is 0"; + return APP_ERR_ACL_FAILURE; + } + if (output[0].errorCode != APP_ERR_OK) + { + LogError << "GetProtobuf error. errorCode=" << output[0].errorCode; + return output[0].errorCode; + } + LogInfo << "errorCode=" << output[0].errorCode; + LogInfo << "key=" << output[0].messageName; + LogInfo << "value=" << output[0].messagePtr.get()->DebugString(); + + auto tensorPackageList = std::static_pointer_cast(output[0].messagePtr); + vector tensors = {}; + GetTensors(tensorPackageList, tensors); + + std::vector> user_rep(nums[0], vector(192)); + std::vector> item_rep(nums[1], vector(192)); + + void *tensorPtr = tensors[0].GetBuffer(); + + std::vector sp = tensors[0].GetShape(); + cout << "GetShape_0=" << nums[0] << '*' << sp[1] << endl; + + unsigned short *ptr = (unsigned short *)tensorPtr; + + freopen("../output/mxbase_user_rep.txt", "w", stdout); + for (int i = 0; i < nums[0]; i++) + { + for (int j = 0; j < 192; j++) + { + user_rep[i][j] = half_to_float(ptr[i * 192 + j]); + cout << user_rep[i][j] << " "; + } + cout << endl; + } + + freopen("/dev/tty", "w", stdout); + + void *tensorPtr1 = tensors[1].GetBuffer(); + + std::vector sp1 = tensors[1].GetShape(); + cout << "GetShape_1=" << nums[1] << '*' << sp1[1] << endl; + + unsigned short *ptr1 = (unsigned short *)tensorPtr1; + + freopen("../output/mxbase_item_rep.txt", "w", stdout); + for (int i = 0; i < nums[1]; i++) + { + for (int j = 0; j < 192; j++) + { + item_rep[i][j] = half_to_float(ptr1[i * 192 + j]); + cout << item_rep[i][j] << " "; + } + cout << endl; + } + freopen("/dev/tty", "w", stdout); + + cout << "user_rep[0][0~5] = " << endl; + for (int i = 0; i < 5; i++) + { + cout << user_rep[0][i] << " "; + } + cout << endl; + + cout << "item_rep[0][0~5] = " << endl; + for (int i = 0; i < 5; i++) + { + cout << item_rep[0][i] << " "; + } + cout << endl; + + mxStreamManager->DestroyAllStreams(); + return 0; +} + diff --git a/official/gnn/bgcf/om_infer/infer/mxbase/predata.py b/official/gnn/bgcf/om_infer/infer/mxbase/predata.py new file mode 100644 index 0000000000000000000000000000000000000000..46372835a2ba96c5814ce2472941df6ff0022759 --- /dev/null +++ b/official/gnn/bgcf/om_infer/infer/mxbase/predata.py @@ -0,0 +1,44 @@ +import numpy as np +import pickle as pkl +import os + +num_user = 7068 +num_item = 3570 + +row_neighs = 40 +gnew_neighs = 20 + + + +if __name__ == '__main__': + + os.mkdir("./dataset") + + with open('../../data/eval/test_inputs.pkl', 'rb') as file: + test_inputs = pkl.load(file) + with open('../../data/eval/test_set.pkl', 'rb') as file: + test_set = pkl.load(file) + with open('../../data/eval/train_set.pkl', 'rb') as file: + train_set = pkl.load(file) + with open('../../data/eval/item_deg_dict.pkl', 'rb') as file: + item_deg_dict = pkl.load(file) + with open('../../data/eval/item_full_set.pkl', 'rb') as file: + item_full_set = pkl.load(file, encoding="...") + + + test_input = test_inputs[0] + users = test_input[0].reshape(1, num_user) + items = test_input[1].reshape(1, num_item) + neg_items = test_input[2].reshape(1, num_item) + u_test_neighs = test_input[3].reshape([1, num_user*row_neighs]) + u_test_gnew_neighs = test_input[4].reshape([1, num_user*gnew_neighs]) + i_test_neighs = test_input[5].reshape([1, num_item*row_neighs]) + i_test_gnew_neighs = test_input[6].reshape([1, num_item*gnew_neighs]) + + np.savetxt("dataset/users.txt",users,fmt='%d') + np.savetxt("dataset/items.txt",items,fmt='%d') + np.savetxt("dataset/neg_items.txt",neg_items,fmt='%d') + np.savetxt("dataset/u_test_neighs.txt",u_test_neighs,fmt='%d') + np.savetxt("dataset/u_test_gnew_neighs.txt",u_test_gnew_neighs,fmt='%d') + np.savetxt("dataset/i_test_neighs.txt",i_test_neighs,fmt='%d') + np.savetxt("dataset/i_test_gnew_neighs.txt",i_test_gnew_neighs,fmt='%d') diff --git a/official/gnn/bgcf/om_infer/infer/mxbase/run.sh b/official/gnn/bgcf/om_infer/infer/mxbase/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..177c38d9463623ded4be37cac97b496810fd24dc --- /dev/null +++ b/official/gnn/bgcf/om_infer/infer/mxbase/run.sh @@ -0,0 +1,33 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +CUR_PATH=$(cd "$(dirname "$0")" || { warn "Failed to check path/to/run.sh" ; exit ; } ; pwd) + +# Simple log helper functions +info() { echo -e "\033[1;34m[INFO ][MxStream] $1\033[1;37m" ; } +warn() { echo >&2 -e "\033[1;31m[WARN ][MxStream] $1\033[1;37m" ; } + +export MX_SDK_HOME="/home/sjtu_liu/mxVision-2.0.2" +export LD_LIBRARY_PATH="${MX_SDK_HOME}/lib":"${MX_SDK_HOME}/opensource/lib":"${MX_SDK_HOME}/opensource/lib64":"/usr/local/Ascend/ascend-toolkit/latest/acllib/lib64":${LD_LIBRARY_PATH} +export GST_PLUGIN_SCANNER="${MX_SDK_HOME}/opensource/libexec/gstreamer-1.0/gst-plugin-scanner" +export GST_PLUGIN_PATH="${MX_SDK_HOME}/opensource/lib/gstreamer-1.0":"${MX_SDK_HOME}/lib/plugins" + +# complie +g++ main.cpp -I "${MX_SDK_HOME}/include/" -I "${MX_SDK_HOME}/opensource/include/" -L "${MX_SDK_HOME}/lib/" -L "${MX_SDK_HOME}/opensource/lib/" -L "${MX_SDK_HOME}/opensource/lib64/" -std=c++11 -D_GLIBCXX_USE_CXX11_ABI=0 -Dgoogle=mindxsdk_private -fPIC -fstack-protector-all -g -Wl,-z,relro,-z,now,-z,noexecstack -pie -Wall -lglog -lmxbase -lmxpidatatype -lplugintoolkit -lstreammanager -lcpprest -lmindxsdk_protobuf -o main + +# run +./main + diff --git a/official/gnn/bgcf/om_infer/infer/sdk/eval.py b/official/gnn/bgcf/om_infer/infer/sdk/eval.py new file mode 100644 index 0000000000000000000000000000000000000000..9175912e02e380bcfb634c72eabaeff0a7ea4590 --- /dev/null +++ b/official/gnn/bgcf/om_infer/infer/sdk/eval.py @@ -0,0 +1,320 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +sample script of CLUE infer using SDK run in docker +""" +import math +import heapq +from multiprocessing import Pool +import os +import numpy as np +import pickle +import argparse +import glob +import os +import time +from pathlib import Path +import pickle as pkl +import MxpiDataType_pb2 as MxpiDataType +import numpy as np +from StreamManagerApi import StreamManagerApi, MxDataInput, InProtobufVector, \ + MxProtobufIn, StringVector + +def parse_args(): + """set and check parameters.""" + parser = argparse.ArgumentParser(description="bgcf process") + parser.add_argument("--eval", type=str, default="../../data/eval", help="eval file") + parser.add_argument("--pipeline", type=str, default="../../data/config/bgcf.pipeline", help="SDK infer pipeline") + args_opt = parser.parse_args() + return args_opt + +args = parse_args() + +num_user = 7068 +num_item = 3570 + +row_neighs = 40 +gnew_neighs = 20 + +Ks = [5, 10, 20, 100] + +with open(args.eval + '/test_inputs.pkl','rb') as file: + test_inputs = pkl.load(file) +with open(args.eval + '/test_set.pkl', 'rb') as file: + test_set = pkl.load(file) +with open(args.eval + '/train_set.pkl', 'rb') as file: + train_set = pkl.load(file) +with open(args.eval + '/item_deg_dict.pkl', 'rb') as file: + item_deg_dict = pkl.load(file) +with open(args.eval + '/item_full_set.pkl', 'rb') as file: + item_full_set = pkl.load(file,encoding="...") + +test_input = test_inputs[0] +print(type(test_inputs[0])) +print(type(test_input[0])) +users = test_input[0].reshape(1,num_user) +items = test_input[1].reshape(1,num_item) +neg_items = test_input[2].reshape(1,num_item) +u_test_neighs = test_input[3].reshape([1,num_user*row_neighs]) +u_test_gnew_neighs = test_input[4].reshape([1,num_user*gnew_neighs]) +i_test_neighs = test_input[5].reshape([1,num_item*row_neighs]) +i_test_gnew_neighs = test_input[6].reshape([1,num_item*gnew_neighs]) + +#np.set_printoptions(suppress=True) +#np.savetxt("dataset/users.txt",users,fmt='%d') +#np.savetxt("dataset/items.txt",items,fmt='%d') +#np.savetxt("dataset/neg_items.txt",neg_items,fmt='%d') +#np.savetxt("dataset/u_test_neighs.txt",u_test_neighs,fmt='%d') +#np.savetxt("dataset/u_test_gnew_neighs.txt",u_test_gnew_neighs,fmt='%d') +#np.savetxt("dataset/i_test_neighs.txt",i_test_neighs,fmt='%d') +#np.savetxt("dataset/i_test_gnew_neighs.txt",i_test_gnew_neighs,fmt='%d') + +def send_source_data(appsrc_id, filename, stream_name, stream_manager, shape, tp): + """ + Construct the input of the stream, + send inputs data to a specified stream based on streamName. + + Returns: + bool: send data success or not + """ + tensors = (filename).astype(np.int32) + tensor_package_list = MxpiDataType.MxpiTensorPackageList() + tensor_package = tensor_package_list.tensorPackageVec.add() + data_input = MxDataInput() + tensor_vec = tensor_package.tensorVec.add() + tensor_vec.deviceId = 0 + tensor_vec.memType = 0 + for i in shape: + tensor_vec.tensorShape.append(i) + print(" shape :", tensor_vec.tensorShape) + array_bytes = tensors.tobytes() + data_input.data = array_bytes + tensor_vec.dataStr = data_input.data + tensor_vec.tensorDataSize = len(array_bytes) + key = "appsrc{}".format(appsrc_id).encode('utf-8') + protobuf_vec = InProtobufVector() + protobuf = MxProtobufIn() + protobuf.key = key + protobuf.type = b'MxTools.MxpiTensorPackageList' + protobuf.protobuf = tensor_package_list.SerializeToString() + protobuf_vec.push_back(protobuf) + ret = stream_manager.SendProtobuf(stream_name, appsrc_id, protobuf_vec) + if ret < 0: + print("Failed to send data to stream.") + return False + else: + print("Send successfully!") + return True + +def send_appsrc_data(appsrc_id, file_name, stream_name, stream_manager, shape, tp): + """ + send three stream to infer model, include input ids, input mask and token type_id. + + Returns: + bool: send data success or not + """ + if not send_source_data(appsrc_id, file_name, stream_name, stream_manager, shape, tp): + return False + return True + +def idcg_k(actual, k): + """Calculates the ideal discounted cumulative gain at k""" + res = sum([1.0 / math.log(i + 2, 2) for i in range(min(k, len(actual)))]) + return 1.0 if not res else res + +def ndcg_k(actual, predicted, topk): + """Calculates the normalized discounted cumulative gain at k""" + idcg = idcg_k(actual, topk) + res = 0 + + dcg_k = sum([int(predicted[j] in set(actual)) / math.log(j + 2, 2) for j in range(topk)]) + res += dcg_k / idcg + return res + +def recall_at_k_2(r, k, all_pos_num): + """Calculates the recall at k""" + r = np.asfarray(r)[:k] + return np.sum(r) / all_pos_num + + +def novelty_at_k(topk_items, item_degree_dict, num_user, k): + """Calculate the novelty at k""" + avg_nov = [] + for item in topk_items[:k]: + avg_nov.append(-np.log2((item_degree_dict[item] + 1e-8) / num_user)) + return np.mean(avg_nov) + +def ranklist_by_heapq(user_pos_test, test_items, rating, Ks): + """Return the n largest score from the item_score by heap algorithm""" + item_score = {} + for i in test_items: + item_score[i] = rating[i] + + K_max = max(Ks) + K_max_item_score = heapq.nlargest(K_max, item_score, key=item_score.get) + + r = [] + for i in K_max_item_score: + if i in user_pos_test: + r.append(1) + else: + r.append(0) + return r, K_max_item_score + +def get_performance(user_pos_test, r, K_max_item, item_degree_dict, num_user, Ks): + """Wraps the model metrics""" + recall, ndcg, novelty = [], [], [] + for K in Ks: + recall.append(recall_at_k_2(r, K, len(user_pos_test))) + ndcg.append(ndcg_k(user_pos_test, K_max_item, K)) + novelty.append(novelty_at_k(K_max_item, item_degree_dict, num_user, K)) + return {'recall': np.array(recall), 'ndcg': np.array(ndcg), 'nov': np.array(novelty)} + +def test_one_user(x): + """Calculate one user metrics""" + rating = x[0] + u = x[1] + + training_items = train_set[u] + + user_pos_test = test_set[u] + + all_items = set(range(num_item)) + + test_items = list(all_items - set(training_items)) + + r, k_max_items = ranklist_by_heapq(user_pos_test, test_items, rating, Ks) + + return get_performance(user_pos_test, r, k_max_items, item_deg_dict, num_user, Ks), \ + [k_max_items[:Ks[x]] for x in range(len(Ks))] + +def run(): + """ + read pipeline and do infer + """ + # init stream manager + stream_manager_api = StreamManagerApi() + ret = stream_manager_api.InitManager() + if ret != 0: + print("Failed to init Stream manager, ret=%s" % str(ret)) + return + + # create streams by pipeline config file + with open(os.path.realpath(args.pipeline), 'rb') as f: + pipeline_str = f.read() + ret = stream_manager_api.CreateMultipleStreams(pipeline_str) + if ret != 0: + print("Failed to create Stream, ret=%s" % str(ret)) + return + + stream_name = b'bgcf_gnn' + infer_total_time = 0 + + if not send_appsrc_data(0, users, stream_name, stream_manager_api, users.shape, np.int64): + return + if not send_appsrc_data(1, items, stream_name, stream_manager_api, items.shape, np.int32): + return + if not send_appsrc_data(2, neg_items, stream_name, stream_manager_api, neg_items.shape, np.int32): + return + if not send_appsrc_data(3, u_test_neighs, stream_name, stream_manager_api, u_test_neighs.shape, np.int32): + return + if not send_appsrc_data(4, u_test_gnew_neighs, stream_name, stream_manager_api, u_test_gnew_neighs.shape, np.int32): + return + if not send_appsrc_data(5, i_test_neighs, stream_name, stream_manager_api, i_test_neighs.shape, np.int32): + return + if not send_appsrc_data(6, i_test_gnew_neighs, stream_name, stream_manager_api, i_test_gnew_neighs.shape, np.int32): + return + # Obtain the inference result by specifying streamName and uniqueId. + key_vec = StringVector() + key_vec.push_back(b'mxpi_tensorinfer0') + start_time = time.time() + infer_result = stream_manager_api.GetProtobuf(stream_name, 0, key_vec) + infer_total_time += time.time() - start_time + if infer_result.size() == 0: + print("inferResult is null") + return + if infer_result[0].errorCode != 0: + print("GetProtobuf error. errorCode=%d" % (infer_result[0].errorCode)) + return + result = MxpiDataType.MxpiTensorPackageList() + result.ParseFromString(infer_result[0].messageBuf) + user_rep = np.frombuffer(result.tensorPackageVec[0].tensorVec[0].dataStr, dtype=np.float16).reshape(7068,192) + print(user_rep.shape) + + item_rep = np.frombuffer(result.tensorPackageVec[0].tensorVec[1].dataStr, dtype=np.float16).reshape(3570,192) + print(item_rep.shape) + + """Evaluation with user and item rep""" + result = {'recall': np.zeros(len(Ks)), 'ndcg': np.zeros(len(Ks)), + 'nov': np.zeros(len(Ks))} + pool = Pool(8) + user_indexes = np.arange(num_user) + + rating_preds = user_rep @ item_rep.transpose() + user_rating_uid = zip(rating_preds, user_indexes) + all_result = pool.map(test_one_user, user_rating_uid) + + top20 = [] + + for re in all_result: + result['recall'] += re[0]['recall'] / num_user + result['ndcg'] += re[0]['ndcg'] / num_user + result['nov'] += re[0]['nov'] / num_user + top20.append(re[1][2]) + + pool.close() + + sedp = [[] for i in range(len(Ks) - 1)] + + num_all_links = np.sum([len(x) for x in item_full_set]) + + for k in range(len(Ks) - 1): + for u in range(num_user): + diff = [] + pred_items_at_k = all_result[u][1][k] + for item in pred_items_at_k: + if item in test_set[u]: + avg_prob_all_user = len(item_full_set[item]) / num_all_links + diff.append(max((Ks[k] - pred_items_at_k.index(item) - 1) + / (Ks[k] - 1) - avg_prob_all_user, 0)) + one_user_sedp = sum(diff) / Ks[k] + sedp[k].append(one_user_sedp) + + sedp = np.array(sedp).mean(1) + + test_recall_bgcf, test_ndcg_bgcf, \ + test_sedp, test_nov = result['recall'].tolist(), result['ndcg'].tolist(), \ + [sedp[1], sedp[2]], result['nov'].tolist() + _epoch = 600 + print( + 'epoch:%03d, recall_@10:%.5f, recall_@20:%.5f, ndcg_@10:%.5f, ndcg_@20:%.5f, ' + 'sedp_@10:%.5f, sedp_@20:%.5f, nov_@10:%.5f, nov_@20:%.5f\n' % (_epoch, + test_recall_bgcf[1], + test_recall_bgcf[2], + test_ndcg_bgcf[1], + test_ndcg_bgcf[2], + test_sedp[0], + test_sedp[1], + test_nov[1], + test_nov[2])) + stream_manager_api.DestroyAllStreams() + + + + +if __name__ == '__main__': + run() + diff --git a/official/gnn/bgcf/om_infer/infer/sdk/eval.sh b/official/gnn/bgcf/om_infer/infer/sdk/eval.sh new file mode 100644 index 0000000000000000000000000000000000000000..57c3dd5243e3fab9ea72b473a2b16d45ca3b217d --- /dev/null +++ b/official/gnn/bgcf/om_infer/infer/sdk/eval.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +CUR_PATH=$(cd "$(dirname "$0")" || { warn "Failed to check path/to/run.sh" ; exit ; } ; pwd) + +# Simple log helper functions +info() { echo -e "\033[1;34m[INFO ][MxStream] $1\033[1;37m" ; } +warn() { echo >&2 -e "\033[1;31m[WARN ][MxStream] $1\033[1;37m" ; } + +export MX_SDK_HOME=/home/zju_mindx/SDK_2.0.2/mxManufacture +export LD_LIBRARY_PATH=${MX_SDK_HOME}/lib:${MX_SDK_HOME}/opensource/lib:${MX_SDK_HOME}/opensource/lib64:/usr/local/Ascend/ascend-toolkit/latest/acllib/lib64:${LD_LIBRARY_PATH} +export GST_PLUGIN_SCANNER=${MX_SDK_HOME}/opensource/libexec/gstreamer-1.0/gst-plugin-scanner +export GST_PLUGIN_PATH=${MX_SDK_HOME}/opensource/lib/gstreamer-1.0:${MX_SDK_HOME}/lib/plugins + +#to set PYTHONPATH, import the StreamManagerApi.py +export PYTHONPATH=$PYTHONPATH:${MX_SDK_HOME}/python + +python3.7 eval.py +exit 0 diff --git a/official/gnn/bgcf/om_infer/infer/sdk/infer.py b/official/gnn/bgcf/om_infer/infer/sdk/infer.py new file mode 100644 index 0000000000000000000000000000000000000000..fb624bad74ab9207d31aaf0c0bd9db4f69b1612c --- /dev/null +++ b/official/gnn/bgcf/om_infer/infer/sdk/infer.py @@ -0,0 +1,284 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +sample script of CLUE infer using SDK run in docker +""" +import math +import heapq +from multiprocessing import Pool +import os +import numpy as np +import pickle +import argparse +import glob +import os +import time +from pathlib import Path +import pickle as pkl +import MxpiDataType_pb2 as MxpiDataType +import numpy as np +from StreamManagerApi import StreamManagerApi, MxDataInput, InProtobufVector, \ + MxProtobufIn, StringVector + + +def parse_args(): + """set and check parameters.""" + parser = argparse.ArgumentParser(description="bgcf process") + parser.add_argument("--eval", type=str, + default="../../data/eval", help="eval file") + parser.add_argument("--pipeline", type=str, + default="../../data/config/bgcf.pipeline", help="SDK infer pipeline") + args_opt = parser.parse_args() + return args_opt + + +args = parse_args() + +num_user = 7068 +num_item = 3570 + +row_neighs = 40 +gnew_neighs = 20 +Ks = [5, 10, 20, 100] + +with open(args.eval + '/test_inputs.pkl', 'rb') as file: + test_inputs = pkl.load(file) +with open(args.eval + '/test_set.pkl', 'rb') as file: + test_set = pkl.load(file) +with open(args.eval + '/train_set.pkl', 'rb') as file: + train_set = pkl.load(file) +with open(args.eval + '/item_deg_dict.pkl', 'rb') as file: + item_deg_dict = pkl.load(file) +with open(args.eval + '/item_full_set.pkl', 'rb') as file: + item_full_set = pkl.load(file, encoding="...") + + +test_input = test_inputs[0] +print(type(test_inputs[0])) +print(type(test_input[0])) +users = test_input[0].reshape(1, num_user) +items = test_input[1].reshape(1, num_item) +neg_items = test_input[2].reshape(1, num_item) +u_test_neighs = test_input[3].reshape([1, num_user*row_neighs]) +u_test_gnew_neighs = test_input[4].reshape([1, num_user*gnew_neighs]) +i_test_neighs = test_input[5].reshape([1, num_item*row_neighs]) +i_test_gnew_neighs = test_input[6].reshape([1, num_item*gnew_neighs]) + +# np.set_printoptions(suppress=True) +# np.savetxt("dataset/users.txt",users,fmt='%d') +# np.savetxt("dataset/items.txt",items,fmt='%d') +# np.savetxt("dataset/neg_items.txt",neg_items,fmt='%d') +# np.savetxt("dataset/u_test_neighs.txt",u_test_neighs,fmt='%d') +# np.savetxt("dataset/u_test_gnew_neighs.txt",u_test_gnew_neighs,fmt='%d') +# np.savetxt("dataset/i_test_neighs.txt",i_test_neighs,fmt='%d') +# np.savetxt("dataset/i_test_gnew_neighs.txt",i_test_gnew_neighs,fmt='%d') + + +def send_source_data(appsrc_id, filename, stream_name, stream_manager, shape, tp): + """ + Construct the input of the stream, + send inputs data to a specified stream based on streamName. + + Returns: + bool: send data success or not + """ + tensors = (filename).astype(np.int32) + tensor_package_list = MxpiDataType.MxpiTensorPackageList() + tensor_package = tensor_package_list.tensorPackageVec.add() + data_input = MxDataInput() + tensor_vec = tensor_package.tensorVec.add() + tensor_vec.deviceId = 0 + tensor_vec.memType = 0 + for i in shape: + tensor_vec.tensorShape.append(i) + print(" shape :", tensor_vec.tensorShape) + array_bytes = tensors.tobytes() + data_input.data = array_bytes + tensor_vec.dataStr = data_input.data + tensor_vec.tensorDataSize = len(array_bytes) + key = "appsrc{}".format(appsrc_id).encode('utf-8') + protobuf_vec = InProtobufVector() + protobuf = MxProtobufIn() + protobuf.key = key + protobuf.type = b'MxTools.MxpiTensorPackageList' + protobuf.protobuf = tensor_package_list.SerializeToString() + protobuf_vec.push_back(protobuf) + ret = stream_manager.SendProtobuf(stream_name, appsrc_id, protobuf_vec) + if ret < 0: + print("Failed to send data to stream.") + return False + else: + print("Send successfully!") + return True + + +def send_appsrc_data(appsrc_id, file_name, stream_name, stream_manager, shape, tp): + """ + send three stream to infer model, include input ids, input mask and token type_id. + + Returns: + bool: send data success or not + """ + if not send_source_data(appsrc_id, file_name, stream_name, stream_manager, shape, tp): + return False + return True + + +def idcg_k(actual, k): + """Calculates the ideal discounted cumulative gain at k""" + res = sum([1.0 / math.log(i + 2, 2) for i in range(min(k, len(actual)))]) + return 1.0 if not res else res + + +def ndcg_k(actual, predicted, topk): + """Calculates the normalized discounted cumulative gain at k""" + idcg = idcg_k(actual, topk) + res = 0 + + dcg_k = sum([int(predicted[j] in set(actual)) / math.log(j + 2, 2) + for j in range(topk)]) + res += dcg_k / idcg + return res + + +def recall_at_k_2(r, k, all_pos_num): + """Calculates the recall at k""" + r = np.asfarray(r)[:k] + return np.sum(r) / all_pos_num + + +def novelty_at_k(topk_items, item_degree_dict, num_user, k): + """Calculate the novelty at k""" + avg_nov = [] + for item in topk_items[:k]: + avg_nov.append(-np.log2((item_degree_dict[item] + 1e-8) / num_user)) + return np.mean(avg_nov) + + +def ranklist_by_heapq(user_pos_test, test_items, rating, Ks): + """Return the n largest score from the item_score by heap algorithm""" + item_score = {} + for i in test_items: + item_score[i] = rating[i] + + K_max = max(Ks) + K_max_item_score = heapq.nlargest(K_max, item_score, key=item_score.get) + + r = [] + for i in K_max_item_score: + if i in user_pos_test: + r.append(1) + else: + r.append(0) + return r, K_max_item_score + + +def get_performance(user_pos_test, r, K_max_item, item_degree_dict, num_user, Ks): + """Wraps the model metrics""" + recall, ndcg, novelty = [], [], [] + for K in Ks: + recall.append(recall_at_k_2(r, K, len(user_pos_test))) + ndcg.append(ndcg_k(user_pos_test, K_max_item, K)) + novelty.append(novelty_at_k(K_max_item, item_degree_dict, num_user, K)) + return {'recall': np.array(recall), 'ndcg': np.array(ndcg), 'nov': np.array(novelty)} + + +def test_one_user(x): + """Calculate one user metrics""" + rating = x[0] + u = x[1] + + training_items = train_set[u] + + user_pos_test = test_set[u] + + all_items = set(range(num_item)) + + test_items = list(all_items - set(training_items)) + + r, k_max_items = ranklist_by_heapq(user_pos_test, test_items, rating, Ks) + + return get_performance(user_pos_test, r, k_max_items, item_deg_dict, num_user, Ks), \ + [k_max_items[:Ks[x]] for x in range(len(Ks))] + + +def run(): + """ + read pipeline and do infer + """ + # init stream manager + stream_manager_api = StreamManagerApi() + ret = stream_manager_api.InitManager() + if ret != 0: + print("Failed to init Stream manager, ret=%s" % str(ret)) + return + + # create streams by pipeline config file + with open(os.path.realpath(args.pipeline), 'rb') as f: + pipeline_str = f.read() + ret = stream_manager_api.CreateMultipleStreams(pipeline_str) + if ret != 0: + print("Failed to create Stream, ret=%s" % str(ret)) + return + + stream_name = b'bgcf_gnn' + infer_total_time = 0 + + if not send_appsrc_data(0, users, stream_name, stream_manager_api, users.shape, np.int64): + return + if not send_appsrc_data(1, items, stream_name, stream_manager_api, items.shape, np.int32): + return + if not send_appsrc_data(2, neg_items, stream_name, stream_manager_api, neg_items.shape, np.int32): + return + if not send_appsrc_data(3, u_test_neighs, stream_name, stream_manager_api, u_test_neighs.shape, np.int32): + return + if not send_appsrc_data(4, u_test_gnew_neighs, stream_name, stream_manager_api, u_test_gnew_neighs.shape, np.int32): + return + if not send_appsrc_data(5, i_test_neighs, stream_name, stream_manager_api, i_test_neighs.shape, np.int32): + return + if not send_appsrc_data(6, i_test_gnew_neighs, stream_name, stream_manager_api, i_test_gnew_neighs.shape, np.int32): + return + # Obtain the inference result by specifying streamName and uniqueId. + key_vec = StringVector() + key_vec.push_back(b'mxpi_tensorinfer0') + start_time = time.time() + infer_result = stream_manager_api.GetProtobuf(stream_name, 0, key_vec) + infer_total_time += time.time() - start_time + if infer_result.size() == 0: + print("inferResult is null") + return + if infer_result[0].errorCode != 0: + print("GetProtobuf error. errorCode=%d" % (infer_result[0].errorCode)) + return + result = MxpiDataType.MxpiTensorPackageList() + result.ParseFromString(infer_result[0].messageBuf) + user_rep = np.frombuffer( + result.tensorPackageVec[0].tensorVec[0].dataStr, dtype=np.float16).reshape(7068, 192) + print("success!!") + print("result: user_rep shape ") + print(user_rep.shape) + + item_rep = np.frombuffer( + result.tensorPackageVec[0].tensorVec[1].dataStr, dtype=np.float16).reshape(3570, 192) + print("result: item_rep shape ") + print(item_rep.shape) + + stream_manager_api.DestroyAllStreams() + + +if __name__ == '__main__': + run() + diff --git a/official/gnn/bgcf/om_infer/infer/sdk/infer.sh b/official/gnn/bgcf/om_infer/infer/sdk/infer.sh new file mode 100644 index 0000000000000000000000000000000000000000..2adbd549007545f43512c84e3f6b7f839b2be071 --- /dev/null +++ b/official/gnn/bgcf/om_infer/infer/sdk/infer.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +CUR_PATH=$(cd "$(dirname "$0")" || { warn "Failed to check path/to/run.sh" ; exit ; } ; pwd) + +# Simple log helper functions +info() { echo -e "\033[1;34m[INFO ][MxStream] $1\033[1;37m" ; } +warn() { echo >&2 -e "\033[1;31m[WARN ][MxStream] $1\033[1;37m" ; } + +export MX_SDK_HOME=/home/zju_mindx/SDK_2.0.2/mxManufacture +export LD_LIBRARY_PATH=${MX_SDK_HOME}/lib:${MX_SDK_HOME}/opensource/lib:${MX_SDK_HOME}/opensource/lib64:/usr/local/Ascend/ascend-toolkit/latest/acllib/lib64:${LD_LIBRARY_PATH} +export GST_PLUGIN_SCANNER=${MX_SDK_HOME}/opensource/libexec/gstreamer-1.0/gst-plugin-scanner +export GST_PLUGIN_PATH=${MX_SDK_HOME}/opensource/lib/gstreamer-1.0:${MX_SDK_HOME}/lib/plugins + +#to set PYTHONPATH, import the StreamManagerApi.py +export PYTHONPATH=$PYTHONPATH:${MX_SDK_HOME}/python + +python3.7 infer.py +exit 0 +~ diff --git a/official/gnn/bgcf/src/callback.py b/official/gnn/bgcf/src/callback.py index 67a73dae20949977f716aa42b3479456cd687b70..6cfb46df9b11f3dfde5254346efe04e9aa3ab27f 100644 --- a/official/gnn/bgcf/src/callback.py +++ b/official/gnn/bgcf/src/callback.py @@ -24,15 +24,17 @@ from mindspore.ops import functional as F from mindspore.ops import operations as P from mindspore.common import dtype as mstype from mindspore.common.parameter import ParameterTuple - +import pickle from src.utils import convert_item_id - +import sys +sys.path.append("..") +from model_utils.config import config def TestBGCF(forward_net, num_user, num_item, input_dim, test_graph_dataset): """BGCF test wrapper""" user_reps = np.zeros([num_user, input_dim * 3]) item_reps = np.zeros([num_item, input_dim * 3]) - + test_inputs = [] for _ in range(50): test_graph_dataset.random_select_sampled_graph() u_test_neighs, u_test_gnew_neighs = test_graph_dataset.get_user_sapmled_neighbor() @@ -46,6 +48,15 @@ def TestBGCF(forward_net, num_user, num_item, input_dim, test_graph_dataset): users = Tensor(np.arange(num_user).reshape(-1,), mstype.int32) items = Tensor(np.arange(num_item).reshape(-1,), mstype.int32) neg_items = Tensor(np.arange(num_item).reshape(-1, 1), mstype.int32) + + test_input = [users.asnumpy(), + items.asnumpy(), + neg_items.asnumpy(), + u_test_neighs.asnumpy(), + u_test_gnew_neighs.asnumpy(), + i_test_neighs.asnumpy(), + i_test_gnew_neighs.asnumpy()] + test_inputs.append(test_input) user_rep, item_rep = forward_net(users, items, @@ -58,6 +69,9 @@ def TestBGCF(forward_net, num_user, num_item, input_dim, test_graph_dataset): user_reps += user_rep.asnumpy() item_reps += item_rep.asnumpy() + if config.device_target == "Ascend": + with open( config.outputpath + '/test_inputs.pkl', 'wb') as file: + pickle.dump(test_inputs, file) user_reps /= 50 item_reps /= 50 return user_reps, item_reps @@ -73,6 +87,14 @@ class ForwardBGCF(nn.Cell): def construct(self, users, items, neg_items, u_neighs, u_gnew_neighs, i_neighs, i_gnew_neighs): """Calculate the user and item representation""" + neg_items = neg_items.view(3570,1) + u_neighs = u_neighs.view(7068,40) + u_gnew_neighs = u_gnew_neighs.view(7068,20) + i_neighs = i_neighs.view(3570,40) + i_gnew_neighs = i_gnew_neighs.view(3570,20) + if len(users.shape) == 2: + users = users[0] + items = items[0] _, user_rep, _, item_rep, _, _, = self.network(users, items, neg_items, diff --git a/official/gnn/bgcf/src/metrics.py b/official/gnn/bgcf/src/metrics.py index 6c0024ab0be6556c91c5fac102f00bc371ebf96a..abe4cd67f775e41f7b6b75e65d2cc098a746de8a 100644 --- a/official/gnn/bgcf/src/metrics.py +++ b/official/gnn/bgcf/src/metrics.py @@ -18,11 +18,13 @@ Recommendation metrics import math import heapq from multiprocessing import Pool - +import os import numpy as np - +import pickle from src.utils import convert_item_id - +import sys +sys.path.append("..") +from model_utils.config import config def ndcg_k(actual, predicted, topk): """Calculates the normalized discounted cumulative gain at k""" @@ -142,6 +144,18 @@ class BGCFEvaluate: [k_max_items[:self.Ks[x]] for x in range(len(self.Ks))] def eval_with_rep(self, user_rep, item_rep, parser): + + if config.device_target == "Ascend": + with open(config.output_path + '/test_set.pkl', 'wb') as file: + pickle.dump(self.test_set, file) + with open(config.output_path + '/train_set.pkl', 'wb') as file: + pickle.dump(self.train_set, file) + + with open(config.output_path + '/item_deg_dict.pkl', 'wb') as file: + pickle.dump(self.item_deg_dict, file) + with open(config.output_path + '/item_full_set.pkl', 'wb') as file: + pickle.dump(self.item_full_set, file) + """Evaluation with user and item rep""" result = {'recall': np.zeros(len(self.Ks)), 'ndcg': np.zeros(len(self.Ks)), 'nov': np.zeros(len(self.Ks))}