# async **Repository Path**: oldyang1983/async ## Basic Information - **Project Name**: async - **Description**: No description available - **Primary Language**: C++ - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-01-13 - **Last Updated**: 2024-01-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README c++异步io库 目标: 1)提供异步访问的通用底层支持(类似node语言里的promise机制)及内置一些常用的io接口 目前已支持的io接口:redis、mysql、http、mongo、rabbitmq、zookeeper 未来将支持的io接口: websocket 2)提供类似服务器进程脚手架的功能,供快速搭建服务器架构 编译: 1、 如需使用redis模块,则cmake时需定义宏USE_ASYNC_REDIS,如需使用集群则需要定义宏USE_ASYNC_CLUSTER_REDIS。如:cmake .. -DUSE_ASYNC_REDIS=1 -DUSE_ASYNC_CLUSTER_REDIS=1 redis模块依赖:libevent、hiredis_cluster、hiredis库 2、 如需使用mongo模块,则cmake时需定义宏USE_ASYNC_MONGO。如:cmake .. -DUSE_ASYNC_MONGO=1 monogo模块依赖: bson-1.0、mongoc-1.0库 3、 如需使用libcurl模块,则cmake时需定义宏USE_ASYNC_CURL。如:cmake .. -DUSE_ASYNC_CURL=1 libcurl模块依赖: libcurl库 4、 如需使用mysql模块,则cmake时需定义宏USE_ASYNC_MYSQL。如:cmake .. -DUSE_ASYNC_MYSQL=1 mysql模块依赖:libmysqlclient8.0库 5、 如需使用ipc模块,则cmake时需定义宏USE_IPC。如:cmake .. -DUSE_IPC=1 ipc模块依赖:zeromq库 6、 如需使用transaction模块,则cmake时需定义宏USE_TRANS。如:cmake .. -DUSE_TRANS=1 transaction模块依赖:无依赖 7、 如需使用net模块,则cmake时需定义宏USE_NET。如: cmake .. -DUSE_NET=1 net模块依赖:libevent 8、 如需使用rabbitmq模块,则cmake时需要定义宏USE_ASYNC_RABBITMQ。如:cmake .. -DUSE_ASYNC_RABBITMQ=1 rabbitmq模块依赖:librabbitmq库 9、 如需使用zookeeper模块,则cmake时需要定义宏USE_ASYNC_ZOOKEEPER。如:cmake .. -DUSE_ASYNC_ZOOKEEPER=1 zookeeper模块依赖:zookeeper-c client的单线程版本 20、serve模块,分三子模块 1)libserve模块,提供基础服务操作。提供了默认的后端进程间通信架构及消息处理样式 2)router模块,提供默认的后端进程通信需用到的路由功能 3)gate模块,一个网关服务进程的模板 21、cmake例子: 1)根目录下执行:mkdir -p build 2)cd build 3)cmake .. -DUSE_ASYNC_CURL=1 -DUSE_ASYNC_REDIS=1 -DUSE_ASYNC_CLUSTER_REDIS=1 -DUSE_ASYNC_MONGO=1 \ -DUSE_ASYNC_MYSQL=1 -DUSE_ASYNC_RABBITMQ=1 -DUSE_ASYNC_ZOOKEEPER=1 -DUSE_IPC=1 -DUSE_TRANS=1 -DUSE_NET=1 -DCMAKE_BUILD_TYPE=Debug 4)输出文件: 库文件:build/bin/libasync.so 头文件: build/include/ 依赖库安装: 如果库安装路径、链接版本、头文件等出现报错,可以根据编译环境调整 common/CMakeLists.txt文件 centos安装依赖库: 1、libevent: 1)wget https://github.com/libevent/libevent/releases/download/release-2.1.8-stable/libevent-2.1.8-stable.tar.gz 2)tar zxvf libevent-2.1.8-stable.tar.gz 3)cd libevent-2.1.8-stable 4)./configure –prefix=/usr --libdir=/usr/local/lib64 5)make 6)make install 2、redis-server: 1)wget http://download.redis.io/releases/redis-5.0.9.tar.gz 2)tar -zxvf redis-5.0.9.tar.gz 3)cd redis-5.0.9 4)make 5)make install 3、hiredis: 1)https://github.com/redis/hiredis/releases下载源码 2)tar zxvf hiredis-1.1.0.tar.gz 3)cd hiredis-1.1.0 4)make 5)make install 4、安装hiredis_vip: 1)https://github.com/vipshop/hiredis-vip 下载源码 2)unzip hiredis-vip-master.zip 3)cd hiredis-vip-master 4)make 5)make install 4-2、安装hiredis-cluster: 1)https://github.com/Nordix/hiredis-cluster 下载源码 2)unzip hiredis-cluster-master.zip 3)cd hiredis-cluster-master 4)mkdir build 5)cd build 6)cmake .. // (需要cmake3.2版本) 5、zeromq: 1)https://github.com/zeromq/libzmq 下载源码 2)unzip libzmq-master.zip 3)cd libzmq-master 4)yum install -y gettext 5)./autogen.sh (如果报错,就安装yum install -y libtool) 6)./configure 7)make 8)make install 6、安装libmysqlclient8.0: 1)它可以随8.0的服务器一起安装 2)https://downloads.mysql.com/archives/community/ 在这个地址下载下来,然后分别拷贝头文件和库到/usr/include/mysql/、/usr/lib64/mysql/ 1)yum install -y mariadb-devel.x86_64 7、mysql-server8.0: 1)wget http://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm 2)rpm -ivh mysql80-community-release-el7-5.noarch.rpm 3)yum install mysql-community-server -y 4)systemctl restart mysqld 5)查看默认密码:cat /var/log/mysqld.log | grep password 6)登录修改密码: (1)mysql -uroot -p (2)alter user 'root'@'localhost' identified by 'Root@123456' 8、libcurl-c: 1)wget https://curl.se/download/curl-7.82.0.tar.gz 2)tar zxvf curl-7.82.0.tar.gz 3)./configure --prefix=/usr/local/ --without-ssl 4)make 5)make install 9、mongoc安装: 1)下载 wget https://github.com/mongodb/mongo-c-driver/releases/download/1.23.2/mongo-c-driver-1.23.2.tar.gz 2)mkdir cmake-build 3)cd cmake-build 4)cmake -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF .. 5)make 6)make install 10、rabbitmq-c安装: 1)下载 wget https://github.com/alanxz/rabbitmq-c/archive/refs/tags/v0.13.0.tar.gz 2)tar zxvf rabbitmq-c-0.13.0.tar.gz 3)mkdir build && cd build 4)cmake -DCMAKE_INSTALL_PREFIX=/usr/local -DENABLE_SSL_SUPPORT=OFF .. 5)make && make install 11、rabbitmq-server安装: 1)安装erlang: (1) wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm (2) rpm -Uvh erlang-solutions-2.0-1.noarch.rpm (3) rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc (4) vi /etc/yum.repos.d/erlang.repo [erlang-solutions] name=CentOS $releasever - $basearch - Erlang Solutions baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch gpgcheck=1 gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc enabled=1 (5) yum install erlang 2)下载: rabbitmq-server-3.9.29-1.el8.noarch.rpm 在:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.9.29 3)yum install -y socat 4)rpm -ivh rabbitmq-server-3.9.29-1.el8.noarch.rpm 5)systemctl start rabbitmq-server 6)systemctl enable rabbitmq-server 7)启动管理插件:rabbitmq-plugins enable rabbitmq_management, systemctl stop firewalld, http://localhost:15672 8)创建用户,授权用户: rabbitmqctl add_user admin admin rabbitmqctl set_user_tags admin administrator 12、 zookeeper安装(依赖java): 1)下载:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz 2)解压:tar zxvf apache-zookeeper-3.7.1-bin.tar.gz 3)cp -r apache-zookeeper-3.7.1-bin /usr/local/zookeeper 4)cd /usr/local/zookeeper 5)mkdir logs 6)mkdir data 7)vi ./conf/zoo.cfg tickTime = 2000 dataDir = /usr/local/zookeeper/data dataLogDir = /usr/local/zookeeper/logs clientPort = 2181 initLimit = 5 syncLimit = 2 8)服务命令: cd ./bin ./zkServer.sh start ./zkServer.sh stop ./zkServer.sh restart ./zkServer.sh status 9)客户端命令: ./zkCli.sh 13、zookeeper-c安装: 依赖工具: yum install -y cppunit-devel yum install -y ant 1)下载:https://github.com/apache/zookeeper 2)进入top目录,运行ant compile_jute, 如果报错没有build.xml文件,需要去3。5.*的版本将build.xml和ivy开头的文件拷贝过来 3)进入zookeeper-client/zookeeper-client-c目录 4)autoreconf -if 5)./configure 6)make && make install 目录结构: example example.cpp ----> 测试用例 CMakeLists.txt ----> 编译文件 async ----> 库目录 threads ----> 线程池目录 thread_pool.h ----> 线程池类 coroutine ----> 协程目录 coroutine_task.h/cpp ----> 协程池类文件 coroutine.h/cpp ----> 协程 async ----> 异步库目录 cpu ----> 异步cpu运算实现 curl ----> 异步http实现 mongo ----> 异步mongo实现 mysql ----> 异步mysql实现 redis ----> 异步redis实现 rabbitmq ----> 异步rabbitmq实现 zookeeper ----> 异步zookeeper实现 co_async ----> 协程与异步io的结合 promise ----> 支持将异步操作变成同步操作,有点类似于js中的promise cpu ----> async::cpu与promise的结合 curl ----> async::curl与promise的结合 mongo ----> async::mongo与promise的结合 mysql ----> async::mysql与promise的结合 redis ----> async::redis与promise的结合 rabbitmq ----> async::rabbitmq与promise的结合 zookeeper ----> async::zookeeper与promise的结合 ipc ----> 用以提供进程间通信时协程式的send接口 ipc ----> 提供进程间的通信,适合用于后端服务之间的通信。使用zeromq库作为实现者 transaction ----> 提供以协程方式运行的包处理接口 net ----> 提供tcp server/client接口,udp server/client接口,http server接口 serve ----> 目标是提供一个服务器进程脚手架,可以快速搭建后端服务,还提供一个通用的route进程。 它会依赖common库,且common库需要使用-DUSE_TRANS编译。已实现大部分功能 serve ----> 编译输出libserve.so库 route ----> ./router -c router.json 启动路由,不同世界的路由互不通信 gate ----> 一个网关服案例工程,可以对它进行修改实现自定义网关服 test ----> 测试案例,起两个进程,这两个进程使用router做转发相互通信。 testhttp ----> 演绎如何监听http服务,并注册httptransaction处理业务 层次结构:co表示coroutine协程 co_async _______________________________________________________________________ | | | | | | | | ____| | | | | |_ | | | | | | | | | | co_redis co_mongo co_curl co_cpu co_mysql parallel接口 co_rabbitmq co_zookeeper | | | | | | | | | | | | | | | | | | | | | | | | ----------------------------------------------------------------------------- | promise | ----------------------------------------------------------------------------- | | | | | | | async_redis async_mongo async_curl async_cpu async_mysql async_rabbitmq async_zookeeper | | | | | | | | | | | | | | --------------------------------------------------------------------------- | | thread_pool(*可选用) 线程池是选用模块,建议使用,能使异步io运行速度变快。 如果不使用,则由调用线程来驱动异步io的运行 异步io模型的实现原理描述: 第一种:同步core,如:async_cpu、async_mongo、async_rabbitmq 第二种:异步串行core,如:aysnc_mysql 第三中:异步并行core,如: aysnc_curl、aysnc_redis、aysnc_zookeeper 同步core: 1、work_thread 投递一个请求,将请求带上work_thread的信息进行包装,然后请求被放入本地请求队列 2、work_thread运行loop (1)localStatistics打印统计信息 (2)localRespond回调回复 (3)从本地队列中弹出一个请求,将请求包装成一个任务抛入io任务队列 3、io线程池 (1)从io任务队列中取出一个任务 (2)处理任务:根据请求信息在全局core队列中取出一个合适的core (3) 如果core队列中没有合适的core,则创建一个新的core (4) core处理完请求拿到请求结果,将结果放入work_thread的回复队列 (5) core失效则关闭掉,core有效,则放回全局core队列 异步串行core: 1、维护一个全局的globaldata,globaldata有一张连接池hash表,连接池中的每个连接有一个待处理请求列表 2、work_thread 投递一个请求,将请求带上work_thread的信息进行包装,然后请求被放入本地请求队列 3、work_thread运行loop (1)localStatistics打印统计信息 (2)localRespond回调回复 (3)遍历本地队列 1)处理请求超时的情况 2)从valid core列表取出一个有效的core,且core并发处理任务没有超过1个,则将请求放入core的待处理队列 3)没有有效的core,且core的数量没有超过supposeIothread限制,则创建一个新core放入conning core列表 4)如果core的数量已达到上线,则将任务均衡挂载到每个core上 (4)利用原子操作抢占dispatchTask标识 1)抢占成功 2)遍历globaldata连接池 3)将core平均分发到每个线程中去执行任务 4、io线程池 (1)从io任务队列取出一个任务 (2)处理任务: 1)处理core的连接状态 2)处理core的待处理任务列表 并行异步core: 1、维护一个全局的globaldata,globaldata有一张连接池hash表,连接池中的每个连接有一个待处理请求列表 2、work_thread 投递一个请求,将请求带上work_thread的信息进行包装,然后请求被放入本地请求队列 3、work_thread运行loop (1)localStatistics打印统计信息 (2)localRespond回调回复 (3)遍历本地队列 1)处理请求超时的情况 2)从valid core列表取出一个有效的core,且core并发处理任务没有超过上限,则将请求放入core的待处理队列 3)没有有效的core,且core的数量没有超过supposeIothread限制,则创建一个新core放入conning core列表 4)如果core的数量已达到上线,则将任务均衡挂载到每个core上 (4)利用原子操作抢占dispatchTask标识 1)抢占成功 2)遍历globaldata连接池 3)将core平均分发到每个线程中去执行任务 4、io线程池 (1)从io任务队列取出一个任务 (2)处理任务: 1)处理core的连接状态 2)处理core的待处理任务列表 example: // 下面代码的输出结果是: promise begin 异步执行结束 promise over:0 void promise_test() { // 起一个协程任务,回调将在协程里运行 CoroutineTask::doTask([](void*){ printf("promise begin\n"); auto ret = co_async::promise([](co_async::Resolve resolve, co_async::Reject reject) { // 两秒后执行下面的回调 co_async::setTimeout([resolve]() { printf("异步执行结束\n"); // 报告promise异步执行结束,并传递结果集,这里传nullptr resolve(nullptr); }, 2*1000); }, 10 * 1000); // promise返回,往下继续走 printf("promise over:%d\n", ret.first); }, 0); } int main() { promise_test(); //cpu_test(true); //co_parallel_test(); //curl_test(true); //mongo_test(true); //redis_test(true); //co_mysql_test(); //ipc_test(); while (true) { co_async::loop(); usleep(100); //printf("loop\n"); } return 0; } /////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////// serve模块的使用案例步骤: 1、启动router进程: ./router -c router.json router.json的配置如下: { "comment": "本配置文件描述的是进程监听及连接端口的信息", "@zm_listen": "zeromq的监听信息列表, 一般是给路由使用", "zm_listen": [ { "world": 1, "protocol": "tcp", "ip": "0.0.0.0", "port": 5001 } ], "@routes": "描述的是将会连接到本进程的连接信息,以便做路由选择使用", "routes": [ { "world": 1, "type": 1, "id": 1 }, { "world": 1, "type": 2, "id": 1 } ] } 2、启动testServe1进程:./testServe -c serve1.json serve1.json的配置如下: { "comment": "本配置文件描述的是进程监听及连接端口的信息", "@self": "标识进程身份的信息", "self": { "world": 1, "type": 1, "id": 1 }, "@zm_connect": "zeromq的连接信息列表", "zm_connect": [ { "protocol": "tcp", "ip": "0.0.0.0", "port": 5001 } ] } 3、启动testServe2进程:./testServe -c serve2.json serve1.json的配置如下: { "comment": "本配置文件描述的是进程监听及连接端口的信息", "@self": "标识进程身份的信息", "self": { "world": 1, "type": 2, "id": 1 }, "@zm_connect": "zeromq的连接信息列表", "zm_connect": [ { "protocol": "tcp", "ip": "0.0.0.0", "port": 5001 } ] }