# api-cplusplus **Repository Path**: dolphindb/api-cplusplus ## Basic Information - **Project Name**: api-cplusplus - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 4 - **Forks**: 5 - **Created**: 2020-06-18 - **Last Updated**: 2025-05-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README >注意:自 1.30.22.1 版本起,该 Readme 不再进行维护。用户可移步至[新 C++ API 手册](https://docs.dolphindb.cn/zh/api/cpp/cpp_api.html)。 本教程主要介绍以下内容: - [DolphinDB C++ API](#dolphindb-c-api) - [1. 项目编译](#1-项目编译) - [1.1 在Linux环境下编译项目](#11-在linux环境下编译项目) - [1.1.1 环境配置](#111-环境配置) - [1.1.2 下载bin文件和头文件](#112-下载bin文件和头文件) - [1.1.3 编译main.cpp](#113-编译maincpp) - [1.1.4 编译](#114-编译) - [1.1.5 运行](#115-运行) - [1.2 Windows环境下编译](#12-windows环境下编译) - [1.2.1 环境配置](#121-环境配置) - [1.2.2 下载bin文件和头文件](#122-下载bin文件和头文件) - [1.2.3 创建Visual Studio项目](#123-创建visual-studio项目) - [1.2.4 编译和运行](#124-编译和运行) - [2. 建立DolphinDB连接](#2-建立dolphindb连接) - [3. 运行DolphinDB脚本](#3-运行dolphindb脚本) - [4. 运行DolphinDB函数](#4-运行dolphindb函数) - [5. 上传数据对象](#5-上传数据对象) - [6. 读取数据示例](#6-读取数据示例) - [6.1 向量](#61-向量) - [6.2 集合](#62-集合) - [6.3 矩阵](#63-矩阵) - [6.4 字典](#64-字典) - [6.5 表](#65-表) - [6.5.1 `getString()`方法获取表的内容](#651-getstring方法获取表的内容) - [6.5.2 `getColumn()`方法按列获取表的内容](#652-getcolumn方法按列获取表的内容) - [6.5.3 `getRow()`方法按照行获取表的内容](#653-getrow方法按照行获取表的内容) - [6.5.4 使用`BlockReaderSP`对象分段读取表数据](#654-使用blockreadersp对象分段读取表数据) - [6.6 AnyVector](#66-anyvector) - [7. 保存数据到DolphinDB数据表](#7-保存数据到dolphindb数据表) - [7.1 保存数据到DolphinDB内存表](#71-保存数据到dolphindb内存表) - [7.1.1 使用insert into语句保存数据](#711-使用insert-into语句保存数据) - [7.1.2 使用tableInsert函数批量保存多条数据](#712-使用tableinsert函数批量保存多条数据) - [7.1.3 使用tableInsert函数保存TableSP对象](#713-使用tableinsert函数保存tablesp对象) - [7.2 保存数据到本地磁盘表](#72-保存数据到本地磁盘表) - [7.3 保存数据到分布式表](#73-保存数据到分布式表) - [7.3.1 使用tableInsert函数保存TableSP对象](#731-使用tableinsert函数保存tablesp对象) - [7.3.2 分布式表的并发写入](#732-分布式表的并发写入) - [8. 注意事项](#8-注意事项) - [C++ Streaming API](#c-streaming-api) - [9 编译](#9-编译) - [9.1 Linux 64位](#91-linux-64位) - [9.1.1 通过cmake](#911-通过cmake) - [10 在Windows中使用MinGW编译](#10-在windows中使用mingw编译) - [11. API](#11-api) - [11.1 ThreadedClient](#111-threadedclient) - [11.1.1 定义线程客户端](#1111-定义线程客户端) - [11.1.2 调用订阅函数](#1112-调用订阅函数) - [11.1.3 取消订阅](#1113-取消订阅) - [11.2 ThreadPooledClient](#112-threadpooledclient) - [11.2.1 定义多线程客户端](#1121-定义多线程客户端) - [11.2.2 调用订阅函数](#1122-调用订阅函数) - [11.2.3 取消订阅](#1123-取消订阅) - [11.3 PollingClient](#113-pollingclient) - [11.3.1 定义客户端](#1131-定义客户端) - [11.3.2 订阅](#1132-订阅) - [11.3.3 取消订阅](#1133-取消订阅) # DolphinDB C++ API DolphinDB C++ API支持以下开发环境: * Linux * Windows Visual Studio * Windows GNU(MinGW) ### 1. 项目编译 ### 1.1 在Linux环境下编译项目 #### 1.1.1 环境配置 C++ API需要使用g++ 4.8.5及以上版本。 #### 1.1.2 下载bin文件和头文件 从本GitHub项目中[下载](https://github.com/dolphindb/api-cplusplus/releases)以下文件: - libDolphinDBAPI.so - DolphinDB.h Exceptions.h SmartPointer.h SysIO.h Types.h Util.h #### 1.1.3 编译main.cpp 在bin和include的同级目录中创建project目录。进入project目录,并创建文件main.cpp: ```C++ #include "DolphinDB.h" #include "Util.h" #include #include using namespace dolphindb; using namespace std; int main(int argc, char *argv[]){ DBConnection conn; bool ret = conn.connect("111.222.3.44", 8503); if(!ret){ cout<<"Failed to connect to the server"<rows(); for(int i=0; igetString(i)< 由于VS里默认定义了min/max两个宏,会与头文件中 `min` 和 `max` 函数冲突。为了解决这个问题,在预处理宏定义中需要加入 `NOMINMAX` 。 > api源代码中用宏定义LINUX、WINDOWS等区分不同平台,因此在预处理宏定义中需要加入 `WINDOWS`。 #### 1.2.4 编译和运行 启动编译,将对应的libDolphinDBAPI.dll拷贝到可执行程序的输出目录,即可运行。 Windows gnu开发环境与Linux相似,可以参考上一章的Linux编译。 ### 2. 建立DolphinDB连接 DolphinDB C++ API 提供的最核心的对象是DBConnection。C++应用可以通过它在DolphinDB服务器上执行脚本和函数,并在两者之间双向传递数据。DBConnection类提供如下主要方法: | 方法名 | 详情 | |:------------- |:-------------| |connect(host, port, [username, password])|将会话连接到DolphinDB服务器| |login(username,password,enableEncryption)|登陆服务器| |run(script)|将脚本在DolphinDB服务器运行| |run(functionName,args)|调用DolphinDB服务器上的函数| |upload(variableObjectMap)|将本地数据对象上传到DolphinDB服务器| |initialize()|初始化连接信息| |close()|关闭当前会话| C++ API通过TCP/IP协议连接到DolphinDB。使用 `connect` 方法创建连接时,需要提供DolphinDB server的IP和端口。 ```C++ DBConnection conn; bool ret = conn.connect("127.0.0.1", 8848); ``` 我们创建连接时也可以使用用户名和密码登录,默认的管理员名称为"admin",密码是"123456"。 ```C++ DBConnection conn; bool ret = conn.connect("127.0.0.1", 8848, "admin", "123456"); ``` 若未使用用户名及密码连接成功,则脚本在Guest权限下运行。后续运行中若需要提升权限,可以使用 conn.login('admin','123456',true) 登录获取权限。 声明connection变量的时候,有两个可选参数: enableSSL(支持SSL), enableAYSN(支持一部分).这两个参数默认值为false. 下面例子是,建立支持SSL而非支持异步的connection,同时服务器端应该添加参数enableHTTPS=true。 ```C++ DBConnection conn(true,false) ``` 下面建立即不支持SSL,但支持异步的connection。异步情况下,步只能执行DolphinDB脚本和函数, 且不再有返回值,该功能适用于异步写入数据。 ```C++ DBConnection conn(false,true) ``` ### 3. 运行DolphinDB脚本 通过 `run` 方法运行DolphinDB脚本: ```C++ ConstantSP v = conn.run(" `IBM` GOOG`YHOO"); cout<getString()< ["IBM", "GOOG", "YHOO"] 需要注意的是,脚本的最大长度为65, 535字节。 ### 4. 运行DolphinDB函数 除了运行脚本之外,run命令还可以直接在远程DolphinDB服务器上执行DolphinDB内置或用户自定义函数。若 `run` 方法只有一个参数,则该参数为脚本;若 `run` 方法有两个参数,则第一个参数为DolphinDB中的函数名,第二个参数是该函数的参数,为ConstantSP类型的向量。 下面的示例展示C++程序通过 `run` 调用DolphinDB内置的 [add](http://www.dolphindb.cn/cn/help/add.html) 函数。 [add](http://www.dolphindb.cn/cn/help/add.html) 函数有两个参数 x 和 y。参数的存储位置不同,也会导致调用方式的不同。可能有以下三种情况: * 所有参数都在DolphinDB server端 若变量 x 和 y 已经通过C++程序在服务器端生成, ```C++ conn.run("x = [1, 3, 5]; y = [2, 4, 6]"); ``` 那么在C++端要对这两个向量做加法运算,只需直接使用 `run` 即可。 ```C++ ConstantSP result = conn.run("add(x,y)"); cout<getString()< [3, 7, 11] * 仅有一个参数在DolphinDB server端存在 若变量 x 已经通过C++程序在服务器端生成, ```C++ conn.run("x = [1, 3, 5]"); ``` 而参数 y 要在C++客户端生成,这时就需要使用“部分应用”方式,把参数 x 固化在 [add](http://www.dolphindb.cn/cn/help/add.html) 函数内。具体请参考[部分应用文档](https://www.dolphindb.cn/cn/help/PartialApplication.html)。 ```C++ vector args; ConstantSP y = Util::createVector(DT_DOUBLE, 3); double array_y[] = {1.5, 2.5, 7}; y->setDouble(0, 3, array_y); args.push_back(y); ConstantSP result = conn.run("add{x,}", args); cout<getString()< [2.5, 5.5, 12] * 两个参数都待由C++客户端赋值 ```C++ vector args; ConstantSP x = Util::createVector(DT_DOUBLE, 3); double array_x[] = {1.5, 2.5, 7}; x->setDouble(0, 3, array_x); ConstantSP y = Util::createVector(DT_DOUBLE, 3); double array_y[] = {8.5, 7.5, 3}; y->setDouble(0, 3, array_y); args.push_back(x); args.push_back(y); ConstantSP result = conn.run("add", args); cout<getString()< [10, 10, 10] ### 5. 上传数据对象 C++ API提供 `upload` 方法,将本地对象上传到DolphinDB。 下面的例子在C++定义了一个 `createDemoTable` 函数,该函数创建了一个本地的表对象。 ```C++ TableSP createDemoTable(){ vector colNames = {"name", "date"," price"}; vector colTypes = {DT_STRING, DT_DATE, DT_DOUBLE}; int colNum = 3, rowNum = 10000, indexCapacity=10000; ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity); vector columnVecs; for(int i = 0; i < colNum; ++i) columnVecs.push_back(table->getColumn(i)); for(unsigned int i = 0 i < rowNum; ++i){ columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i))); columnVecs[1]->set(i, Util::createDate(2010, 1, i+1)); columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0)); } return table; } ``` 需要注意的是,上述例子中采用的 `set` 方法作为一个虚函数,会产生较大的开销,调用 `set` 方法对表的列向量逐个赋值,在数据量很大的情况下会导致效率低下。此外, `createString` , `createDate` , `createDouble` 等构造方法要求操作系统分配内存,反复调用同样会产生很大的开销。 相对合理的做法是定义一个相应类型的数组,通过诸如 setInt(INDEX start, int len, const int* buf) 的方式一次或者多次地将数据批量传给列向量。 当表对象的数据量较小时,可以采用上述例子中的方式生成 TableSP 对象的数据,但是当数据量较多时,建议采用如下方式来生成数据。 ```C++ TableSP createDemoTable(){ vector colNames = {"name", "date", "price"}; vector colTypes = {DT_STRING, DT_DATE, DT_DOUBLE}; int colNum = 3, rowNum = 10000, indexCapacity=10000; ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity); vector columnVecs; for(int i = 0; i < colNum; ++i) columnVecs.push_back(table->getColumn(i)); int array_dt_buf[Util::BUF_SIZE]; //定义date列缓冲区数组 double array_db_buf[Util::BUF_SIZE]; //定义price列缓冲区数组 int start = 0; int no=0; while (start < rowNum) { size_t len = std::min(Util::BUF_SIZE, rowNum - start); int *dtp = columnVecs[1]->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部 double *dbp = columnVecs[2]->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部 for (int i = 0; i < len; ++i) { columnVecs[0]->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式 dtp[i] = 17898+i; dbp[i] = (rand()%100)/3.0; } columnVecs[1]->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组 columnVecs[2]->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组 start += len; } return table; } ``` 上述例子采用的诸如 `getIntBuffer` 等方法能够直接获取一个可读写的缓冲区,写完后使用 `setInt` 将缓冲区写回数组,这类函数会检查给定的缓冲区地址和变量底层储存的地址是否一致,如果一致就不会发生数据拷贝。在多数情况下,用 `getIntBuffer` 获得的缓冲区就是变量实际的存储区域,这样能减少数据拷贝,提高性能。 以下利用自定义的 `createDemoTable` 函数创建表对象之后,通过 `upload` 方法把它上传到DolphinDB,再从DolphinDB获取这个表的数据,保存到本地对象result并打印。 ```C++ TableSP table = createDemoTable(); conn.upload("myTable", table); string script = "select * from myTable;"; ConstantSP result = conn.run(script); cout<getString()<size(); for(int i = 0; i < size; ++i) cout<getInt(i)<size(); for(int i = 0; i < size; ++i) cout<getString(i)<getString()<getString()<get(Util::createInt(1))->getString()<getString()< columnVecs; int qty[200],sum[200]; double price[200]; for(int i=0; i<200;++i){ qty[i]=atoi(columnVecs[2]->getString(i).c_str()); price[i]=atof(columnVecs[3]->getString(i).c_str()); sum[i]=qty[i]*price[i]; } for(int i = 0; i < 200; ++i){ cout<getString(i)<<", "<getString(i)<<", "<getRow(0)->getString()<37.811678 qty->410 sym->IBM timestamp->13:45:15 ``` 若要先按行获取table的内容,再对其中的数据进行操作,则需要调用`getMember()`方法来获取对应列的数据。其中,`getMember()`函数的参数不是C++内置的string类型对象,而是DolphinDB C++ API的string类型Constant对象。 ```C++ cout<getRow(0)->getMember(Util::createString("price"))->getDouble()<hasNext()){ table=reader->read(); total += table->size(); cout<< "read" <size()<getString()<getString()<get(2); cout<getString()<getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部 double *dbp = prices->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部 for (int i = 0; i < len; i++) { names->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式 dtp[i] = 17898+i; dbp[i] = (rand()%100)/3.0; } dates->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组 prices->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组 start += len; } vector allnames = {"names", "dates", "prices"}; vector allcols = {names, dates, prices}; conn.upload(allnames, allcols); script += "insert into tglobal values(names, dates, prices); tglobal"; TableSP table = conn.run(script); ``` #### 7.1.2 使用tableInsert函数批量保存多条数据 在这个例子中,我们利用索引指定TableSP对象的多行数据,将它们批量保存到DolphinDB server上。 ```C++ vector args; TableSP table = createDemoTable(); VectorSP range = Util::createPair(DT_INDEX); range->setIndex(0, 0); range->setIndex(1, 10); cout<getString()<get(range)); conn.run("tableInsert{tglobal}", args); ``` #### 7.1.3 使用tableInsert函数保存TableSP对象 ```C++ vector args; TableSP table = createDemoTable(); args.push_back(table); conn.run("tableInsert{tglobal}", args); ``` 把数据保存到内存表,还可以使用[append!](http://www.dolphindb.cn/cn/help/append1.html)函数,它可以把一张表追加到另一张表。但是,一般不建议通过[append!](http://www.dolphindb.cn/cn/help/append1.html)函数保存数据,因为[append!](http://www.dolphindb.cn/cn/help/append1.html)函数会返回一个表的schema,增加通信量。 ```C++ vector args; TableSP table = createDemoTable(); args.push_back(table); conn.run("append!(tglobal);", args); ``` #### 7.2 保存数据到本地磁盘表 本地磁盘表通用用于静态数据集的计算分析,既可以用于数据的输入,也可以作为计算的输出。它不支持事务,也不持支并发读写。 在DolphinDB中使用以下脚本创建一个本地磁盘表,使用[database](http://www.dolphindb.cn/cn/help/database1.html)函数创建数据库,调用[saveTable](http://www.dolphindb.cn/cn/help/saveTable.html)命令将内存表保存到磁盘中: ``` t = table(100:0, `name` date`price, [STRING,DATE,DOUBLE]); db=database("/home/dolphindb/demoDB"); saveTable(db, t, `dt); share t as tDiskGlobal; ``` 使用[tableInsert](http://www.dolphindb.cn/cn/help/tableInsert.html)函数是向本地磁盘表追加数据最为常用的方式。这个例子中,我们使用[tableInsert](http://www.dolphindb.cn/cn/help/tableInsert.html)向共享的内存表tDiskGlobal中插入数据,接着调用[saveTable](http://www.dolphindb.cn/cn/help/saveTable.html)把插入的数据保存到磁盘上。 ```C++ TableSP table = createDemoTable(); vector args; args.push_back(table); conn.run("tableInsert{tDiskGlobal}", args); conn.run("saveTable(db, tDiskGlobal, `dt); "); ``` 本地磁盘表支持使用[append!](http://www.dolphindb.cn/cn/help/append1.html)函数把数据追加到表中: ```C++ TableSP table = createDemoTable(); conn.upload("mt", table); string script; script += "db=database(\"/home/demoTable1\");"; script += "tDiskGlobal.append!(mt);"; script += "saveTable(db,tDiskGlobal,`dt);"; conn.run(script); ``` 注意: 1. 对于本地磁盘表,[append!](http://www.dolphindb.cn/cn/help/append1.html)函数只把数据追加到内存,如果要保存到磁盘上,必须再次执行[saveTable](http://www.dolphindb.cn/cn/help/saveTable.html)函数。 2. 除了使用[share](http://www.dolphindb.cn/cn/help/share1.html)让表在其他会话中可见,也可以在C++ API中使用[loadTable](http://www.dolphindb.cn/cn/help/loadTable.html)来加载磁盘表,使用[append!](http://www.dolphindb.cn/cn/help/append1.html)来追加数据。但是,我们不推荐这种方法,因为[loadTable](http://www.dolphindb.cn/cn/help/loadTable.html)函数从磁盘加载数据,会消耗大量时间。如果有多个客户端都使用[loadTable](http://www.dolphindb.cn/cn/help/loadTable.html) ,内存中会有多个表的副本,造成数据不一致。 #### 7.3 保存数据到分布式表 分布式表是DolphinDB推荐在生产环境下使用的数据存储方式,它支持快照级别的事务隔离,保证数据一致性。分布式表支持多副本机制,既提供了数据容错能力,又能作为数据访问的负载均衡。下面的例子通过C++ API把数据保存至分布式表。 #### 7.3.1 使用tableInsert函数保存TableSP对象 在DolphinDB中使用以下脚本创建分布式表。[database](http://www.dolphindb.cn/cn/help/database1.html)函数用于创建数据库,对于分布式数据库,路径必须以 dfs 开头。[createPartitionedTable](http://www.dolphindb.cn/cn/help/createPartitionedTable.html)函数用于创建分区表。 ``` login( `admin, ` 123456) dbPath = "dfs://SAMPLE_TRDDB"; tableName = `demoTable db = database(dbPath, VALUE, 2010.01.01..2010.01.30) pt=db.createPartitionedTable(table(1000000:0, `name` date `price, [STRING,DATE,DOUBLE]), tableName, ` date) ``` 使用[loadTable](http://www.dolphindb.cn/cn/help/loadTable.html)方法加载分布式表,通过[tableInsert](http://www.dolphindb.cn/cn/help/tableInsert.html)方式追加数据: ```C++ TableSP table = createDemoTable(); vector args; args.push_back(table); conn.run("tableInsert{loadTable('dfs://SAMPLE_TRDDB', `demoTable)}", args); ``` [append!](http://www.dolphindb.cn/cn/help/append1.html)函数也能向分布式表追加数据,但是性能与[tableInsert](http://www.dolphindb.cn/cn/help/tableInsert.html)相比要差,建议不要轻易使用: ```C++ TableSP table = createDemoTable(); conn.upload("mt", table); conn.run("loadTable('dfs://SAMPLE_TRDDB', `demoTable).append!(mt);"); conn.run(script); ``` #### 7.3.2 分布式表的并发写入 DolphinDB的分布式表支持并发读写,下面展示如何在C++客户端中将数据并发写入DolphinDB的分布式表。 首先,在DolphinDB服务端执行以下脚本,创建分布式数据库"dfs://natlog"和分布式表"natlogrecords"。其中,数据库按照VALUE-HASH-HASH的组合进行三级分区。 ``` dbName="dfs://natlog" tableName="natlogrecords" db1 = database("", VALUE, datehour(2019.09.11T00:00:00)..datehour(2019.12.30T00:00:00) )//starttime, newValuePartitionPolicy=add db2 = database("", HASH, [IPADDR, 50]) //source_address db3 = database("", HASH, [IPADDR, 50]) //destination_address db = database(dbName, COMPO, [db1,db2,db3]) data = table(1:0, ["fwname","filename","source_address","source_port","destination_address","destination_port","nat_source_address","nat_source_port","starttime","stoptime","elapsed_time"], [SYMBOL,STRING,IPADDR,INT,IPADDR,INT,IPADDR,INT,DATETIME,DATETIME,INT]) db.createPartitionedTable(data,tableName,`starttime`source_address`destination_address) ``` > 请注意:DolphinDB不允许多个writer同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。 对于按哈希值进行分区的分布式表, DolphinDB C++ API 提供了`getHash`函数来数据的hash值。在客户端设计多线程并发写入分布式表时,根据哈希分区字段数据的哈希值分组,每组指定一个写线程。这样就能保证每个线程同时将数据写到不同的哈希分区。 ```C++ ConstantSP spIP = Util::createConstant(DT_IP); int key = spIP->getHash(BUCKETS); ``` 开启生产数据和消费数据的线程,下面的`genData`用于生成模拟数据,`writeData`用于写数据。 ```C++ for (int i = 0; i < tLong; ++i) { arg[i].index = i; arg[i].count = tLong; arg[i].nLong = nLong; arg[i].cLong = cLong; arg[i].nTime = 0; arg[i].nStarttime = sLong; genThreads[i] = std::thread(genData, (void *)&arg[i]); writeThreads[i] = std::thread(writeData, (void *)&arg[i]); } ``` 每个生产线程首先生成数据,其中`createDemoTable`函数用于产生模拟数据,并返回一个TableSP对象。 ```C++ void *genData(void *arg) { struct parameter *pParam; pParam = (struct parameter *)arg; long partitionCount = BUCKETS / pParam->count; for (unsigned int i = 0; i < pParam->nLong; i++) { TableSP table = createDemoTable(pParam->cLong, partitionCount * pParam->index, partitionCount, pParam->nStarttime, i * 5); tableQueue[pParam->index]->push(table); } return NULL; } ``` 每个消费线程开始向DolphinDB并行写入数据。 ```C++ void *writeData(void *arg) { struct parameter *pParam; pParam = (struct parameter *)arg; TableSP table; for (unsigned int i = 0; i < pParam->nLong; i++) { tableQueue[pParam->index]->pop(table); long long startTime = Util::getEpochTime(); vector args; args.push_back(table); conn[pParam->index].run("tableInsert{loadTable('dfs://natlog', `natlogrecords)}", args); pParam->nTime += Util::getEpochTime() - startTime; } printf("Thread %d,insert %ld rows %ld times, used %ld ms.\n", pParam->index, pParam->cLong, pParam->nLong, pParam->nTime); return NULL; } ``` 更多分布式表的并发写入案例可以参考样例[MultiThreadDFSWriting.cpp](./example/DFSWritingWithMultiThread/MultiThreadDfsWriting.cpp)。 ### 8. 注意事项 1. DBConnection类的所有函数都不是线程安全的,不可以并行调用,否则可能会导致程序崩溃。 关于C++ API的更多信息,可以参考C++ API 头文件[dolphindb.h](./include/DolphinDB.h)。 # C++ Streaming API C++ API处理流数据的方式有三种:ThreadedClient, ThreadPooledClient 和 PollingClient。 三种实现方式可以参考[test/StreamingThreadedClientTester.cpp](./test/StreamingThreadedClientTester.cpp), [test/StreamingThreadPooledClientTester.cpp](./test/StreamingThreadPooledClientTester.cpp) 和 [test/StreamingPollingClientTester.cpp](./test/StreamingPollingClientTester.cpp)。 ### 9 编译 #### 9.1 Linux 64位 #### 9.1.1 通过cmake 安装cmake: ``` bash sudo apt-get install cmake ``` 编译: ``` bash mkdir build && cd build cmake -DCMAKE_BUILD_TYPE=Release ../path_to_api-cplusplus/ make -j `nproc` ``` 编译成功后,会生成三个可执行文件。 #### 10 在Windows中使用MinGW编译 安装[MinGW](http://www.mingw.org/)和[cmake](https://cmake.org/): ``` bash mkdir build && cd build cmake -DCMAKE_BUILD_TYPE=Release `path_to_api-cplusplus` -G "MinGW Makefiles" mingw32-make -j `nproc` ``` 编译成功后,会生成三个可执行文件。 注意: 1. 编译前,需要把libDolphinDBAPI.dll复制到编译目录。 2. 执行例子前,需要把libDolphinDBAPI.dll和libgcc_s_seh-1.dll复制到可执行文件的相同目录下。 ### 11. API #### 11.1 ThreadedClient ThreadedClient 产生一个线程。每次新数据从流数据表发布时,该线程去获取和处理数据。 #### 11.1.1 定义线程客户端 ``` ThreadedClient::ThreadClient(int listeningPort); ``` * listeningPort 是单线程客户端的订阅端口号。 #### 11.1.2 调用订阅函数 ``` ThreadSP ThreadedClient::subscribe(string host, int port, MessageHandler handler, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1, bool resub = true, VectorSP filter = nullptr); ``` * host是发布端节点的主机名。 * port是发布端节点的端口号。 * handler是用户自定义的回调函数,用于处理每次流入的消息。函数的参数是流入的消息,每条消息就是六数据表的一行。函数的结果必须是void。 * tableName是字符串,表示发布端上共享流数据表的名称。 * actionName是字符串,表示订阅任务的名称。它可以包含字母、数字和下划线。 * offset是整数,表示订阅任务开始后的第一条消息所在的位置。消息是流数据表中的行。如果没有指定offset,或它为负数或超过了流数据表的记录行数,订阅将会从流数据表的当前行开始。offset与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。 * resub是布尔值,表示订阅中断后,是否会自动重订阅。 * filter是一个向量,表示过滤条件。流数据表过滤列在filter中的数据才会发布到订阅端,不在filter中的数据不会发布。 ThreadSP 指向循环调用handler的线程的指针。该线程在此topic被取消订阅后会退出。 示例: ``` auto t = client.subscribe(host, port, [](Message msg) { // user-defined routine }, tableName); t->join(); ``` #### 11.1.3 取消订阅 ``` void ThreadClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME); ``` * host 是发布端节点的主机名。 * port 是发布端节点的端口号。 * tableName 是字符串,表示发布端上共享流数据表的名称。 * actionName 是字符串,表示订阅任务的名称。它可以包含字母、数字和下划线。 该函数用于停止向发布者订阅数据。 #### 11.2 ThreadPooledClient ThreadPooledClient 产生用户指定数量的多个线程。每次新数据从流数据表发布时,这些线程同时去获取和处理数据。当数据到达速度超过单个线程所能处理的限度时,ThreadPooledClient 比 ThreadedClient 有优势。 #### 11.2.1 定义多线程客户端 ``` ThreadPooledClient::ThreadPooledClient(int listeningPort, int threadCount); ``` * listeningPort 是多线程客户端节点的订阅端口号。 * threadCount 是线程池的大小。 #### 11.2.2 调用订阅函数 ``` vector ThreadPooledClient::subscribe(string host, int port, MessageHandler handler, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1, bool resub = true, VectorSP filter = nullptr); ``` 参数参见2.1.2节。 返回一个指针向量,每个指针指向循环调用handler的线程。这些线程在此topic被取消订阅后会退出。 示例: ``` auto vec = client.subscribe(host, port, [](Message msg) { // user-defined routine }, tableName); for(auto& t : vec) { t->join(); } ``` #### 11.2.3 取消订阅 ``` void ThreadPooledClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME); ``` 参数参见2.1.3节。 #### 11.3 PollingClient 订阅数据时,会返回一个消息队列。用户可以从其中获取和处理数据。 #### 11.3.1 定义客户端 ``` PollingClient::PollingClient(int listeningPort); ``` * listeningPort 是客户端节点的订阅端口号。 #### 11.3.2 订阅 ``` MessageQueueSP PollingClient::subscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1); ``` 参数参见2.1.2节。 该函数返回指向消息队列的指针。 示例: ``` auto queue = client.subscribe(host, port, handler, tableName); Message msg; while(true) { if(queue->poll(msg, 1000)) { if(msg.isNull()) break; // handle msg } } ``` #### 11.3.3 取消订阅 ``` void PollingClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME); ``` 参数参见2.1.3节。 注意,对于这种订阅模式,取消订阅时,会返回一个空指针,用户需要自行处理。