# DCF **Repository Path**: liuzhanfeng2/DCF ## Basic Information - **Project Name**: DCF - **Description**: No description available - **Primary Language**: Unknown - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 54 - **Created**: 2025-03-03 - **Last Updated**: 2025-03-03 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Distributed Consensus Framework 分布式一致性框架 #### 一、工程说明 ##### 1、编程语言:C ##### 2、编译工具:cmake或make,建议使用cmake ##### 3、目录说明: - DCF:主目录,CMakeLists.txt为主工程入口; - src::源代码目录,按子目录划分模块解耦; - test:测试用例 - build:工程构建脚本 #### 二、编译指导与工程构建 ##### 概述 编译DCF需要dcf和binarylibs两个组件。 - dcf:dcf的主要代码。可以从开源社区获取。 - binarylibs:依赖的第三方开源软件,你可以直接编译openGauss-third_party代码获取,也可以从开源社区下载已经编译好的并上传的一个副本。 ##### 操作系统和软件依赖要求 支持以下操作系统: - CentOS 7.6(x86) - openEuler-20.03-LTS
- openEuler-22.03-LTS
- openEuler-24.03-LTS
适配其他系统,可参照openGauss数据库编译指导
当前DCF依赖第三方软件有securec、lz4、zstd、openssl、cjson; 编译dcf依赖的第三方软件要求与编译opengauss对依赖的第三方软件要求一致。 ##### 下载dcf及第三方软件 对于dcf代码,开发人员可以从本仓库([https://gitee.com/opengauss/DCF](https://gitee.com/opengauss/DCF))直接clone到本地。 对于第三方软件, OpenGauss所有的第三方软件都统一维护在[https://gitee.com/opengauss/openGauss-third_party](https://gitee.com/opengauss/openGauss-third_party)仓库中。开发人员可以通过两种方式获取需要的第三方软件: 1. 直接下载编译好的openGauss-third_party二进制包。2. 下载源码自行编译。 如果想要直接下载编译好的openGauss-third_party二进制包,请参考仓库[https://gitee.com/opengauss/openGauss-third_party](https://gitee.com/opengauss/openGauss-third_party)中的ReadMe文档,其中维护了针对不同的操作系统和软件版本的编译好的openGauss-third_party二进制包的下载链接。比如,对于OpenEuler_x86系统,gcc的10.3以上版本,最新的编译好的第三方软件二进制包下载链接为[https://opengauss.obs.cn-south-1.myhuaweicloud.com/latest/binarylibs/gcc10.3/openGauss-third_party_binarylibs_openEuler_x86_64.tar.gz](https://opengauss.obs.cn-south-1.myhuaweicloud.com/latest/binarylibs/gcc10.3/openGauss-third_party_binarylibs_openEuler_x86_64.tar.gz)。 如果想要自己编译,请将仓库[https://gitee.com/opengauss/openGauss-third_party](https://gitee.com/opengauss/openGauss-third_party)完整clone到本地,并参考**该仓库ReadMe文档中的构建过程**和本文档后续的**编译第三方软件**章节。 ##### 编译第三方软件 在编译dcf之前,需要先编译dcf依赖的开源及第三方软件。这些开源及第三方软件存储在openGauss-third_party代码仓库中,通常只需要构建一次。如果开源软件有更新,需要重新构建软件。
用户也可以直接从binarylibs库中获取开源软件编译和构建的输出文件。 ##### 代码编译 使用DCF/build/linux/opengauss/build.sh编译代码, 参数说明请见以下表格。
| 选项 | 参数 | 说明 | | --- |:--- | :--- | | -3rd | [binarylibs path] | 指定binarylibs路径。该路径必须是绝对路径。| | -m | [version_mode] | 编译目标版本,Debug或者Release。默认Release| | -t | [build_tool] | 指定编译工具,cmake或者make。默认cmake| 现在只需使用如下命令即可编译:
[user@linux ]$ sh build.sh -3rd [binarylibs path] -m Release -t cmake
完成编译后,动态库生成在DCF/output/lib目录中 #### 三、接口说明 ##### 1、API列表 DCF角色定义: ``` typedef enum en_dcf_role { DCF_ROLE_UNKNOWN = 0, DCF_ROLE_LEADER, DCF_ROLE_FOLLOWER, DCF_ROLE_LOGGER, DCF_ROLE_PASSIVE, DCF_ROLE_PRE_CANDIDATE, DCF_ROLE_CANDIDATE, DCF_ROLE_CEIL, } dcf_role_t; ``` - ___int dcf_set_param(const char *param_name, const char *param_value);___ 功能说明:设置DCF配置参数 参数说明:param_name是需要设置的参数名称,param_value是需要设置的参数值。 参数名称有以下类型: "ELECTION_TIMEOUT" --选举超时时间,单位ms "HEARTBEAT_INTERVAL" --心跳间隔,单位ms "RUN_MODE" --运行模式,ELECTION_AUTO或ELECTION_MANUAL "INSTANCE_NAME" --实例名称 "DATA_PATH" --数据文件路径 "LOG_PATH" --日志文件路径 "LOG_LEVEL" --最大日志级别"RUN_ERR|RUN_WAR|RUN_INF|DEBUG_ERR|DEBUG_WAR|DEBUG_INF|MEC|OPER|TRACE|PROFILE", 需要开启自定义级别,从上述字符串中选取并使用|分割; 默认级别"RUN_ERR|RUN_WAR|DEBUG_ERR|OPER" 若需要关闭日志打印,配置"NONE" "LOG_BACKUP_FILE_COUNT" --日志备份文件数 "MAX_LOG_FILE_SIZE" --日志文件最大size,单位MB "LOG_FILE_PERMISSION" --日志文件权限,权限不高于700 "LOG_PATH_PERMISSION" --日志路径权限,权限不高于700 "MEC_AGENT_THREAD_NUM" --通信agent线程数量 "MEC_REACTOR_THREAD_NUM" --通信reactor线程数量 "MEC_CHANNEL_NUM" --通信通道数量 "MEM_POOL_INIT_SIZE" --共用buddy pool的初始size "MEM_POOL_MAX_SIZE" --共用buddy pool的最大size "COMPRESS_ALGORITHM" --通信压缩算法, 0:COMPRESS_NONE, 1:COMPRESS_ZSTD, 2:COMPRESS_LZ4 "COMPRESS_LEVEL" --压缩级别 "SOCKET_TIMEOUT" --socket收发报文超时时间,单位ms "CONNECT_TIMEOUT" --连接超时时间,单位ms "REP_APPEND_THREAD_NUM" --leader节点发送日志的线程数 "MEC_FRAGMENT_SIZE" --通信消息buffer size "STG_POOL_INIT_SIZE" --存储pool初始size "STG_POOL_MAX_SIZE" --存储pool最大size,存储有读写两个pool,这里是单个pool的size "MEC_POOL_MAX_SIZE" --通信pool最大size,通信有收发两个pool,这里是单个pool的size "FLOW_CONTROL_CPU_THRESHOLD" -- CPU使用率超过此值时会对passive节点的日志同步进行流控,单位% "FLOW_CONTROL_NET_QUEUE_MESSAGE_NUM_THRESHOLD" -- DCF发送日志队列中消息超过此值时会对passive节点的日志同步进行流控 "FLOW_CONTROL_DISK_RAWAIT_THRESHOLD" -- 磁盘读延时超过此值时会对passive节点的日志同步进行流控,单位us "DN_FLOW_CONTROL_RTO" -- DN流控参数,结合接口dcf_pause_rep使用 "DN_FLOW_CONTROL_RPO" -- DN流控参数,结合接口dcf_pause_rep使用 - ___int dcf_get_param(const char *param_name, const char *param_value, unsigned int size);___ 功能说明:设置DCF配置参数 参数说明:param_name是需要设置的参数名称,参数名称如dcf_set_param中参数param_name一致 param_value是获取的参数值,需提前分配内存 size是param_value的大小 - ___int dcf_register_after_writer(usr_cb_after_writer_t cb_func);___ 功能说明:注册leader节点写入数据成功的回调函数 参数说明:回调函数形式如下,其中stream_id是分组编号,相同编号的组成一个一致性group; index是落盘日志的index; buf是落盘的日志buf; size是落盘的日志size; key是落盘日志的key,可以唯一标识一条日志; error_no是错误码 ``` typedef int (*usr_cb_after_writer_t)(unsigned int stream_id, unsigned long long index, const char *buf, unsigned int size, unsigned long long key, int error_no); ``` - ___int dcf_register_consensus_notify(usr_cb_consensus_notify_t cb_func);___ 功能说明:注册follower节点写入数据成功的回调函数 参数说明:回调函数形式如下,参数解释同上 ``` typedef int (*usr_cb_consensus_notify_t)(unsigned int stream_id, unsigned long long index, const char *buf, unsigned int size, unsigned long long key); ``` - ___int dcf_register_status_notify(usr_cb_status_notify_t cb_func);___ 功能说明:注册节点角色变化的回调函数 参数说明:回调函数形式如下,new_role是节点新角色 ``` typedef int (*usr_cb_status_notify_t)(unsigned int stream_id, dcf_role_t new_role); ``` - ___int dcf_register_log_output(usr_cb_log_output_t cb_func);___ 功能说明:注册日志输出的回调函数 参数说明:回调函数形式如下,log_type是日志类型,LOG_RUN、LOG_DEBUG等; log_level是日志级别,LEVEL_ERROR、LEVEL_WARN等; code_file_name是代码文件名,如__FILE__; code_line_num是代码行号,如__LINE__; module_name是模块名,如"DCF"; format, ...是格式化字符串 ``` typedef void (*usr_cb_log_output_t)(int log_type, int log_level, const char *code_file_name, unsigned int code_line_num, const char *module_name, const char *format, ...); ``` - ___int dcf_register_exception_report(usr_cb_exception_notify_t cb_func);___ 功能说明:注册异常处理函数 参数说明:回调函数形式如下,dcf_exception_t异常类型,见dcf_interface.h中定义 ``` typedef int(*usr_cb_exception_notify_t)(unsigned int stream_id, dcf_exception_t exception); ``` - ___int dcf_register_election_notify(usr_cb_election_notify_t cb_func);___ 功能说明:注册选举leader变化的回调函数 参数说明:回调函数形式如下,new_leader 是新主的nodeid ``` typedef int (*usr_cb_election_notify_t)(unsigned int stream_id, unsigned int new_leader); ``` - ___int dcf_register_msg_proc(usr_cb_msg_proc_t cb_func);___ 功能说明:注册选举leader变化的回调,follower调用函数 参数说明:回调函数形式如下, ``` typedef int (*usr_cb_msg_proc_t)(unsigned int stream_id, unsigned int src_node_id, const char* msg, unsigned int msg_size); ``` - ___int dcf_start(unsigned int node_id, const char *cfg_str);___ 功能说明:启动工作线程 参数说明:node_id是节点id; cfg_str是集群节点列表,按照json字符串的格式进行配置,每个json item的配置信息包括stream_id/node_id/ip/port/role; 例如三个节点 ``` "[{ "stream_id":1, "node_id":1, "ip":"127.0.0.1", "port":1711, "role":"LEADER" },{ "stream_id":1, "node_id":2, "ip":"127.0.0.1", "port":1712, "role":"FOLLOWER" },{ "stream_id":1, "node_id":3, "ip":"127.0.0.1", "port":1713, "role":"FOLLOWER" }]" ``` - ___int dcf_write(unsigned int stream_id, const char* buffer, unsigned int length, unsigned long long key, unsigned long long *index);___ 功能说明:写入数据,仅leader节点调用 参数说明:buffer是待写入数据的buffer; length是待写入数据的size; key是待写入数据的key,可以唯一标识一条日志; index是leader分配的日志index - ___int dcf_universal_write(unsigned int stream_id, const char* buffer, unsigned int length, unsigned long long key, unsigned long long *index);___ 功能说明:写入数据,可在任意节点调用,但性能不如dcf_write。 参数说明:buffer是待写入数据的buffer; length是待写入数据的size; key是待写入数据的key,可以唯一标识一条日志; index是leader分配的日志index - ___int dcf_read(unsigned int stream_id, unsigned long long index, char *buffer, unsigned int length);___ 功能说明:查询已写入的数据,成功返回实际读到的字节数,失败返回ERROR(-1) 参数说明:参考前述 - ___int dcf_stop();___ 功能说明:停止工作线程 参数说明: - ___int dcf_truncate(unsigned int stream_id, unsigned long long first_index_kept);___ 功能说明:丢弃索引first_index_kept之前的日志 参数说明:first_index_kept是保留的第一个日志index - ___int dcf_set_applied_index(unsigned int stream_id, unsigned long long index);___ 功能说明:设置applied index,在函数dcf_start调用前调用 参数说明:index是日志index - ___int dcf_get_cluster_min_applied_idx(unsigned int stream_id, unsigned long long* index);___ 功能说明:获取集群所有节点最小的applied index 参数说明:*index是获取到的最小applied index - ___int dcf_get_leader_last_index(unsigned int stream_id, unsigned long long* index);___ 功能说明:查询leader节点的last index 参数说明:返回值index为last index - ___int dcf_get_last_index(unsigned int stream_id, unsigned long long* index);___ 功能说明:查询当前节点的last index 参数说明:返回值index为last index - ___int dcf_get_node_last_disk_index(unsigned int stream_id, unsigned int node_id, unsigned long long* index);___ 功能说明:获取node_id节点的last disk index,只可在leader调用。成功返回SUCCESS,失败返回ERROR 参数说明:*index为获取到的last disk index。 - ___int dcf_query_cluster_info(char* buffer, unsigned int length);___ 功能说明:查询集群信息,streamlist、node等 参数说明:buffer是查询信息输出空间; length是最大输出长度; 函数返回值是实际输出长度 例如三个节点的cluster查询信息: ``` { "local_node_id":1, "stream_list":[{"stream_id":1,"local_node_id":1,"role":"FOLLOWER","term":3,"work_mode":0, "applied_index":0,"commit_index":0,"first_index":1,"last_index":5733936, "leader_id":3,"leader_ip":"127.0.0.1","leader_port":1713, "nodes":[{"node_id":1,"ip":"127.0.0.1","port":1711,"role":"FOLLOWER"}, {"node_id":2,"ip":"127.0.0.1","port":1712,"role":"FOLLOWER"}, {"node_id":3,"ip":"127.0.0.1","port":1713,"role":"LEADER"}] }] } ``` - ___int dcf_query_stream_info(unsigned int stream_id, char *buffer, unsigned int length);___ 功能说明:查询stream信息 参数说明:stream_id是待查询stream的id; buffer是查询信息输出空间; length是最大输出长度; 函数返回值是实际输出长度 例如三个节点的stream查询信息: ``` { "stream_id":1,"local_node_id":3,"role":"FOLLOWER","term":2,"work_mode":0, "applied_index":0,"commit_index":0,"first_index":1,"last_index":0, "leader_id":2,"leader_ip":"127.0.0.1","leader_port":1712, "nodes":[{"node_id":1,"ip":"127.0.0.1","port":1711,"role":"FOLLOWER"}, {"node_id":2,"ip":"127.0.0.1","port":1712,"role":"LEADER"}, {"node_id":3,"ip":"127.0.0.1","port":1713,"role":"FOLLOWER"}] } ``` - ___int dcf_query_leader_info(unsigned int stream_id, char *ip, unsigned int ip_len, unsigned int *port, unsigned int *node_id);___ 功能说明:查询leader信息 参数说明:ip是输出leader ip的buffer; ip_len是ip buffer长度; port输出leader的port; node_id输出leader的node_id - ___int dcf_get_errorno();___ 功能说明:获取错误码 参数说明: - ___const char* dcf_get_error(int code);___ 功能说明:获取错误信息 参数说明:code错误码 - ___const char *dcf_get_version();___ 功能说明:获取版本信息 参数说明: - ___int dcf_add_member(unsigned int stream_id, unsigned int node_id, const char *ip, unsigned int port, dcf_role_t role, unsigned int wait_timeout_ms);___ 功能说明:添加节点,只可在leader调用。成功返回SUCCESS(0),失败返回ERROR(-1),超时返回TIMEOUT(1),超时最终也可能成功,可以重试。 参数说明:node_id是待添加节点id; ip是待添加节点ip; port是待添加节点port,调用者需保证port可用; role是待添加节点角色; wait_timeout_ms是超时时间,单位ms - ___int dcf_remove_member(unsigned int stream_id, unsigned int node_id, unsigned int wait_timeout_ms);___ 功能说明:删除节点,只可在leader调用。成功返回SUCCESS(0),失败返回ERROR(-1),超时返回TIMEOUT(1),超时最终也可能成功,可以重试。 参数说明:node_id是待删除节点id; wait_timeout_ms是超时时间,单位ms - ___int dcf_change_member_role(unsigned int stream_id, unsigned int node_id, dcf_role_t new_role, unsigned int wait_timeout_ms);___ 功能说明:改变节点角色,在leader调用可改变其他节点角色,非leader调用只能改变节点自身的角色。成功返回SUCCESS(0),失败返回ERROR(-1),超时返回TIMEOUT(1),超时最终也可能成功,可以重试。 参数说明:node_id为被修改角色节点id; new_role是节点新角色。 - ___int dcf_change_member(const char *change_str, unsigned int wait_timeout_ms);___ 功能说明:改变节点属性,在leader调用可改变其他节点的role/group/priority等属性,非leader调用只能改变节点自身的属性,一次可改变一个或多个属性。成功返回SUCCESS(0),失败返回ERROR(-1),超时返回TIMEOUT(1),超时最终也可能成功,可以重试。 参数说明:change_str是需要修改的节点及属性列表,按照json字符串的格式进行配置,例如[{"stream_id":1,"node_id":1,"group":1,"priority":5,"role":"FOLLOWER"}]。 - ___int dcf_promote_leader(unsigned int stream_id, unsigned int node_id, unsigned int wait_timeout_ms);___ 功能说明:推选指定节点为leader。在leader调用可推选其他节点,在follower节点调用只能推选自己。失败返回ERROR(-1),成功返回SUCCESS(0),返回SUCCESS仅代表推选命令下发成功,最终能否成功需要调用者查询。 参数说明:node_id为被推选节点id; wait_timeout_ms是超时时间,单位ms,为0表示不阻塞leader直接发起推选。 - ___int dcf_timeout_notify(unsigned int stream_id, unsigned int node_id);___ 功能说明:外部触发超时 参数说明:stream_id≠0表示触发指定stream_id超时,stream_id=0表示触发所有stream_id超时 ___int int dcf_set_work_mode(unsigned int stream_id, dcf_work_mode_t work_mode, unsigned int vote_num);___ 功能说明:设置运行模式(正常、少数派) 参数说明:work_mode为正常或少数派,如果是少数派模式,需指定票数。 - ___int dcf_query_statistics_info(char *buffer, unsigned int length);___ 功能说明:获取统计信息,需要日志级别开启PROFILE。 参数说明:buffer是查询信息输出空间; length是最大输出长度 - ___int dcf_check_if_all_logs_applied(unsigned int stream_id, unsigned int *all_applied);___ 功能说明:一般在节点升主时使用,检查当前欲升主节点的dcf日志是否都完成apply。调用成功返回SUCCESS,失败返回ERROR,调用成功后可从*all_applied获取结果。 参数说明:*all_applied为获取到的结果,0表示日志没有都完成apply,非0表示日志都完成apply。 - ___int dcf_send_msg(unsigned int stream_id, unsigned int dest_node_id, const char* msg, unsigned int msg_size);___ 功能说明:用于节点间对指定节点发送消息。调用成功返回SUCCESS,失败返回ERROR。 参数说明:dest_node_id为指定节点,msg表示待发送的消息,msg_size表示消息大小。 - ___int dcf_broadcast_msg(unsigned int stream_id, const char* msg, unsigned int msg_size);___ 功能说明:用于对除当前节点外所有节点广播发送消息。调用成功返回SUCCESS,失败返回ERROR。 参数说明:msg表示待发送的消息,msg_size表示消息大小。 - ___int dcf_pause_rep(unsigned int stream_id, unsigned int node_id, unsigned int time_us);___ 功能说明:对指定节点暂停日志复制。调用成功返回SUCCESS,失败返回ERROR。 参数说明:node_id指定暂停的节点; time_us是暂停时间(不超过1s),单位us。 - ___int dcf_demote_follower(unsigned int stream_id);___ 功能说明:对主节点进行降备 参数说明:stream_id对应降备的stream。 - ___int dcf_get_last_commit_index(unsigned int stream_id, unsigned int is_consensus, unsigned long long* index);___ 功能说明:获取最新 commit index 值 参数说明:stream_id对应群组id,默认为1; is_consensus,是否要求一致性(true, false); index,出参commit index。 - ___int dcf_get_current_term_and_role(unsigned int stream_id, unsigned long long* term, dcf_role_t* role);___ 功能说明:获取自己当前的任期和角色信息。 参数说明:失败返回ERROR(-1),成功返回SUCCESS(0)。返回SUCCESS时可以从出参term获取任期,从出参role获取角色。 - ___int int dcf_set_election_priority(unsigned int stream_id, unsigned long long priority);___ 功能说明:设置节点的选举优先级。频繁调用该接口时内部有保护,1s内只能设置成功一次。 参数说明:priority是需要设置的优先级值。 - ___void dcf_set_timer(void *timer);___ 功能说明:注册上层组件的timer给DCF使用,timer需要与DCF内部gs_timer_t结构一致,一般内部组件间使用。 参数说明:timer是上层组件timer地址。 ##### 2、DEMO示例 参见:DCF/test/test_main目录 test_main.c的main函数包含两个参数,分别是Node Id和一个包含了集群配置信息的JSON字符串。其参数内容和意义均和前文中提到的`dcf_start` API一致。 示例运行命令:(假设编译完成后,对应的可执行文件名为`test_main`) ``` # 节点1 ./test_main 1 '[{"stream_id":1,"node_id":1,"ip":"127.0.0.1","port":1711,"role":"LEADER"},{"stream_id":1,"node_id":2,"ip":"127.0.0.1","port":1712,"role":"FOLLOWER"},{"stream_id":1,"node_id":3,"ip":"127.0.0.1","port":1713,"role":"FOLLOWER"}]' # 节点2 ./test_main 2 '[{"stream_id":1,"node_id":1,"ip":"127.0.0.1","port":1711,"role":"LEADER"},{"stream_id":1,"node_id":2,"ip":"127.0.0.1","port":1712,"role":"FOLLOWER"},{"stream_id":1,"node_id":3,"ip":"127.0.0.1","port":1713,"role":"FOLLOWER"}]' # 节点3 ./test_main 3 '[{"stream_id":1,"node_id":1,"ip":"127.0.0.1","port":1711,"role":"LEADER"},{"stream_id":1,"node_id":2,"ip":"127.0.0.1","port":1712,"role":"FOLLOWER"},{"stream_id":1,"node_id":3,"ip":"127.0.0.1","port":1713,"role":"FOLLOWER"}]' ``` #### 四、测试工程 ##### 1、编译 ##### 2、执行测试用例 待续... #### 五、应用实例 ##### 1、GaussDB(for openGauss)使能paxos特性实践 具体可参考:https://gitee.com/opengauss/blog/blob/master/content/zh/post/yanghaiyan/openGauss%E4%BD%BF%E8%83%BDpaxos%E7%89%B9%E6%80%A7%E5%AE%9E%E8%B7%B5.md