diff --git a/plugin/uwal/CMakeLists.txt b/plugin/uwal/CMakeLists.txt index 537ef3d2a65d7559944850a6884e484eafb1f2d3..b0c59170f3107cfe26bbcb784e329989d1a1e8ac 100644 --- a/plugin/uwal/CMakeLists.txt +++ b/plugin/uwal/CMakeLists.txt @@ -8,6 +8,7 @@ MYSQL_ADD_PLUGIN(uwal_master uwal_adaptor.cc uwal_source_plugin.cc uwal_source.cc + uwal_common.cc MODULE_ONLY MODULE_OUTPUT_NAME "uwal_master" VISIBILITY_HIDDEN @@ -16,7 +17,8 @@ MYSQL_ADD_PLUGIN(uwal_master MYSQL_ADD_PLUGIN(uwal_replica uwal_adaptor.cc uwal_replica_plugin.cc - uwal_source.cc + uwal_replica.cc + uwal_common.cc MODULE_ONLY MODULE_OUTPUT_NAME "uwal_replica" VISIBILITY_HIDDEN diff --git a/plugin/uwal/uwal_common.cc b/plugin/uwal/uwal_common.cc new file mode 100644 index 0000000000000000000000000000000000000000..646c0113f3c83090ed3e637d4ddd38112e160fa1 --- /dev/null +++ b/plugin/uwal/uwal_common.cc @@ -0,0 +1,126 @@ +#include +#include + +#include "my_byteorder.h" +#include "my_compiler.h" +#include "my_systime.h" +#include "sql/mysqld.h" +#include "plugin/uwal/uwal_common.h" + +bool uwal_enabled = false; +unsigned long uwal_disk_size = 0; +unsigned int uwal_id = 0; +unsigned int uwal_port = 9991; +char *uwal_ip = nullptr; +char *uwal_protocol = nullptr; +char *uwal_devices_path = nullptr; +char *uwal_log_path = nullptr; + +void InsertConfigElem(UwalCfgElem *elems, char *name, const char *value, int index) +{ + elems[index].substr = name; + elems[index].value = value; +} + +int UwalBase::UwalInit() +{ + SetEnabled(uwal_enabled); + SetNodeId(uwal_id); + UwalCfgElem elem[UWAL_ELEM_NUM]; + int index = 0; + + SetIP(uwal_ip); + InsertConfigElem(elem, "ock.uwal.ip", GetIP(), index++); + + SetPort(uwal_port); + std::string portStr = std::to_string(GetPort()); + InsertConfigElem(elem, "ock.uwal.port", portStr.c_str(), index++); + + SetProtocol(uwal_protocol); + InsertConfigElem(elem, "ock.uwal.protocol", GetProtocolStr(), index++); + + InsertConfigElem(elem, "ock.uwal.disk.poolid", UWAL_DISK_POOLID, index++); + + std::string diskSizeStr = std::to_string(uwal_disk_size); + InsertConfigElem(elem, "ock.uwal.disk.size", diskSizeStr.c_str(), index++); + + InsertConfigElem(elem, "ock.uwal.disk.min.block", "2147483648", index++); + InsertConfigElem(elem, "ock.uwal.disk.max.block", "2147483648", index++); + InsertConfigElem(elem, "ock.uwal.devices.path", uwal_devices_path, index++); + + InsertConfigElem(elem, "ock.uwal.rpc.worker.thread.num", "4", index++); + InsertConfigElem(elem, "ock.uwal.rpc.timeout", "30000", index++); + InsertConfigElem(elem, "ock.uwal.rpc.compression.switch", "false", index++); + InsertConfigElem(elem, "ock.uwal.rpc.flowcontrol.switch", "false", index++); + InsertConfigElem(elem, "ock.uwal.rpc.flowcontrol.value", "128", index++); + + if (GetProtocol() == NET_PROTOCOL_RDMA) { + InsertConfigElem(elem, "ock.uwal.rpc.rndv.switch", "true", index++); + } + + InsertConfigElem(elem, "ock.uwal.devices.split.switch", "true", index++); + InsertConfigElem(elem, "ock.uwal.devices.split.size", "2147483648", index++); + InsertConfigElem(elem, "ock.uwal.devices.split.path", uwal_devices_path, index++); + InsertConfigElem(elem, "ock.uwal.devices.split.mysql.switch", "true", index++); + InsertConfigElem(elem, "ock.uwal.devices.split.mysql.prefix", "mysql-bin", index++); + + return ock_uwal_init(NULL, elem, index, uwal_log_path); +} + +void UwalBase::GetLocalStateInfo(NodeStateInfo *nodeStateInfo) +{ + nodeStateInfo->nodeId = GetNodeId(); + nodeStateInfo->state = NODE_STATE_UP; + nodeStateInfo->groupId = 0; + nodeStateInfo->groupLevel = 0; + + NetInfo netInfo; + netInfo.ipv4Addr = ock_uwal_ipv4_inet_to_int(GetIP()); + netInfo.port = GetPort(); + netInfo.protocol = GetProtocol(); + + NetList netList; + netList.num = 1; + netList.list[0] = netInfo; + nodeStateInfo->netList = netList; +} + +void UwalNotifyCallback(void *ctx, int ret) +{ + CBParams *cbParams = (CBParams *)ctx; + pthread_mutex_lock(&cbParams->mutex); + cbParams->cbResult = true; + cbParams->ret = ret; + pthread_mutex_unlock(&cbParams->mutex); + pthread_cond_signal(&cbParams->cond); +} + +int UwalSyncNotify(NodeStateList *nodeList) +{ + CBParams cbParams = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, false, 0}; + pthread_mutex_lock(&cbParams.mutex); + int ret = ock_uwal_notify_nodelist_change(nodeList, UwalNotifyCallback, (void *)&cbParams); + if (ret != 0) { + pthread_mutex_unlock(&cbParams.mutex); + return ret; + } + uint16 count = 0; + while (!cbParams.cbResult) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 3; + ts.tv_nsec = 0; + int condWaitRet = 0; + condWaitRet = pthread_cond_timedwait(&cbParams.cond, &cbParams.mutex, &ts); + if (condWaitRet == ETIMEDOUT) { + ++count; + LogErr(SYSTEM_LEVEL, ER_UWAL_NOTIFY_WAIT, count); + } + } + pthread_mutex_unlock(&cbParams.mutex); + bool success = (cbParams.ret == 0); + if (success) { + LogErr(SYSTEM_LEVEL, ER_UWAL_NOTIFY_SUCCESS); + } + return success ? 0 : -1; +} \ No newline at end of file diff --git a/plugin/uwal/uwal_common.h b/plugin/uwal/uwal_common.h new file mode 100644 index 0000000000000000000000000000000000000000..454d5466f46bb849e0fa3c3b8870be60b08ec1cc --- /dev/null +++ b/plugin/uwal/uwal_common.h @@ -0,0 +1,164 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022-2024. All rights reserved. + */ + +#ifndef __UWAL_COMMON_H__ +#define __UWAL_COMMON_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "mysql/plugin.h" +#include "sql/derror.h" // ER_THD +#include "mysqld_error.h" +#include "plugin/uwal/uwal_adaptor.h" + +#define UWAL_ELEM_NUM 100 + +extern bool uwal_enabled; +extern unsigned long uwal_disk_size; +extern unsigned int uwal_id; +extern unsigned int uwal_port; +extern char *uwal_ip; +extern char *uwal_protocol; +extern char *uwal_devices_path; +extern char *uwal_log_path; + +typedef struct CBParams { + pthread_mutex_t mutex; + pthread_cond_t cond; + bool cbResult; + int ret; +} CBParams; + +typedef struct UwalObjNodeInfo { + uint32_t id; + char ip[UWAL_IP_LEN]; + NetProtocol protocol; +} UwalObjNodeInfo; + +class UwalSpinLock { +public: + UwalSpinLock() = default; + ~UwalSpinLock() = default; + + UwalSpinLock(const UwalSpinLock &) = delete; + UwalSpinLock &operator = (const UwalSpinLock &) = delete; + UwalSpinLock(UwalSpinLock &&) = delete; + UwalSpinLock &operator = (UwalSpinLock &&) = delete; + + inline void TryLock() + { + mFlag.test_and_set(std::memory_order_acquire); + } + + inline void Lock() + { + while (mFlag.test_and_set(std::memory_order_acquire)) { + } + } + + inline void Unlock() + { + mFlag.clear(std::memory_order_release); + } + +private: + std::atomic_flag mFlag = ATOMIC_FLAG_INIT; +}; + +class UwalBase { +public: + UwalBase() = default; + ~UwalBase() = default; + + void SetEnabled(bool value) + { + enabled = value; + } + + bool GetEnabled() + { + return enabled; + } + + void SetNodeId(uint32_t value) + { + id = value; + } + + uint32_t GetNodeId() + { + return id; + } + + void SetPort(uint32_t value) + { + port = value; + } + + uint32_t GetPort() + { + return port; + } + + void SetIP(char *value) + { + memcpy(ip, value, strlen(value) + 1); + } + + char *GetIP() + { + return ip; + } + + void SetProtocol(char *value) + { + if (!strcasecmp(UWAL_PROTOCOL_RDMA, value)) { + protocol = NET_PROTOCOL_RDMA; + } + } + + NetProtocol GetProtocol() + { + return protocol; + } + + char *GetProtocolStr() + { + if (protocol == NET_PROTOCOL_RDMA) { + return UWAL_PROTOCOL_RDMA; + } + return UWAL_PROTOCOL_TCP; + } + + int UwalInit(); + + void GetLocalStateInfo(NodeStateInfo *nodeStateInfo); + +private: + bool enabled = false; + uint32_t id = 0; + uint32_t port = 0; + NetProtocol protocol = NET_PROTOCOL_TCP; + int index = 0; + char ip[UWAL_IP_LEN]; +}; + +void UwalNotifyCallback(void *ctx, int ret); + +int UwalSyncNotify(NodeStateList *nodeList); + +#endif \ No newline at end of file diff --git a/plugin/uwal/uwal_replica.cc b/plugin/uwal/uwal_replica.cc new file mode 100644 index 0000000000000000000000000000000000000000..7be08bc74e176fa10ca5927211827b549c72dbdd --- /dev/null +++ b/plugin/uwal/uwal_replica.cc @@ -0,0 +1,51 @@ +#include "plugin/uwal/uwal_replica.h" + +unsigned int uwal_master_id = 0; +char *uwal_master_ip = nullptr; +char *uwal_master_protocol = nullptr; + +int UwalReplica::UwalInitNotify() +{ + NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo)); + nodeList->nodeNum = 1; + nodeList->localNodeId = GetNodeId(); + nodeList->masterNodeId = NODE_ID_INVALID; + + // local state info + NodeStateInfo localStateInfo; + GetLocalStateInfo(&localStateInfo); + nodeList->nodeList[0] = localStateInfo; + + int ret = UwalSyncNotify(nodeList); + free(nodeList); + return ret; +} + +int UwalReplica::UwalRegisterMaster() +{ + LogErr(SYSTEM_LEVEL, ER_UWAL_REPLICA_NOTIFY); + NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo) * 2); + nodeList->nodeNum = 2; + nodeList->localNodeId = GetNodeId(); + nodeList->masterNodeId = masterInfo.id; + + NodeStateInfo localStateInfo; + GetLocalStateInfo(&localStateInfo); + nodeList->nodeList[0] = localStateInfo; + + nodeList->nodeList[1].groupId = 0; + nodeList->nodeList[1].groupLevel = 0; + nodeList->nodeList[1].state = NODE_STATE_UP; + nodeList->nodeList[1].nodeId = masterInfo.id; + nodeList->nodeList[1].netList.num = 1; + nodeList->nodeList[1].netList.list[0].port = GetPort(); + nodeList->nodeList[1].netList.list[0].ipv4Addr =ock_uwal_ipv4_inet_to_int(masterInfo.ip); + nodeList->nodeList[1].netList.list[0].protocol = NET_PROTOCOL_TCP; + if (GetProtocol() == NET_PROTOCOL_RDMA && masterInfo.protocol == NET_PROTOCOL_RDMA) { + nodeList->nodeList[1].netList.list[0].protocol = NET_PROTOCOL_RDMA; + } + + int ret = UwalSyncNotify(nodeList); + free(nodeList); + return ret; +} \ No newline at end of file diff --git a/plugin/uwal/uwal_replica.h b/plugin/uwal/uwal_replica.h new file mode 100644 index 0000000000000000000000000000000000000000..a93ac5fa556cad2e086b3d0d84ee796280745e3e --- /dev/null +++ b/plugin/uwal/uwal_replica.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022-2024. All rights reserved. + */ + +#ifndef __UWAL_REPLICA_H__ +#define __UWAL_REPLICA_H__ + +#include "plugin/uwal/uwal_common.h" + +extern unsigned int uwal_master_id; +extern char *uwal_master_ip; +extern char *uwal_master_protocol; + +class UwalReplica : public UwalBase { +public: + UwalReplica() + { + SetMasterInfo(); + } + + ~UwalReplica() = default; + + void SetMasterInfo() + { + if (uwal_master_ip == nullptr) { + return; + } + masterInfo.id = uwal_master_id; + memcpy(masterInfo.ip, uwal_master_ip, strlen(uwal_master_ip) + 1); + if (!strcasecmp(UWAL_PROTOCOL_RDMA, uwal_master_protocol)) { + masterInfo.protocol = NET_PROTOCOL_RDMA; + } + } + + int UwalInitNotify(); + + int UwalRegisterMaster(); + +private: + UwalObjNodeInfo masterInfo; +}; + +#endif \ No newline at end of file diff --git a/plugin/uwal/uwal_replica_plugin.cc b/plugin/uwal/uwal_replica_plugin.cc index 15d647c4be63b39be9d5ec374cf28876b625851c..2f7bd8e9aeeadb48ff4c53b0cd302424c93145b9 100644 --- a/plugin/uwal/uwal_replica_plugin.cc +++ b/plugin/uwal/uwal_replica_plugin.cc @@ -12,7 +12,7 @@ #include "sql/sql_class.h" #include "sql/sql_lex.h" #include "typelib.h" -#include "plugin/uwal/uwal_source.h" +#include "plugin/uwal/uwal_replica.h" #include "sql/replication.h" #define PLUGIN_AUTHOR_HUAWEI "Huawei Technologies Co., Ltd." @@ -22,7 +22,7 @@ static SERVICE_TYPE(registry) *reg_srv = nullptr; SERVICE_TYPE(log_builtins) *log_bi = nullptr; SERVICE_TYPE(log_builtins_string) *log_bs = nullptr; -UwalObject *uwalReplicaObj = nullptr; +UwalReplica *uwalReplica = nullptr; static MYSQL_SYSVAR_BOOL( enabled, uwal_enabled, PLUGIN_VAR_OPCMDARG, @@ -134,51 +134,9 @@ static SYS_VAR *uwal_replica_system_vars[] = { nullptr, }; -int UwalStandbyInitNotify() -{ - NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo)); - nodeList->nodeNum = 1; - nodeList->localNodeId = uwalReplicaObj->GetNodeId(); - nodeList->masterNodeId = NODE_ID_INVALID; - - // local state info - NodeStateInfo localStateInfo; - uwalReplicaObj->GetLocalStateInfo(&localStateInfo); - nodeList->nodeList[0] = localStateInfo; - - int ret = UwalSyncNotify(nodeList); - free(nodeList); - return ret; -} - static int uwal_binlog_relay_after_register() { - LogPluginErr(SYSTEM_LEVEL, ER_UWAL_REPLICA_NOTIFY); - NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo) * 2); - nodeList->nodeNum = 2; - nodeList->localNodeId = uwalReplicaObj->GetNodeId(); - nodeList->masterNodeId = uwalReplicaObj->master_info.node_id; - - NodeStateInfo localStateInfo; - uwalReplicaObj->GetLocalStateInfo(&localStateInfo); - nodeList->nodeList[0] = localStateInfo; - - nodeList->nodeList[1].groupId = 0; - nodeList->nodeList[1].groupLevel = 0; - nodeList->nodeList[1].state = NODE_STATE_UP; - nodeList->nodeList[1].nodeId = uwalReplicaObj->master_info.node_id; - nodeList->nodeList[1].netList.num = 1; - nodeList->nodeList[1].netList.list[0].port = uwalReplicaObj->GetListenPort(); - nodeList->nodeList[1].netList.list[0].ipv4Addr =ock_uwal_ipv4_inet_to_int(uwalReplicaObj->master_info.node_ip); - nodeList->nodeList[1].netList.list[0].protocol = NET_PROTOCOL_TCP; - if (uwalReplicaObj->GetProtocol() == NET_PROTOCOL_RDMA - && uwalReplicaObj->master_info.protocol == NET_PROTOCOL_RDMA) { - nodeList->nodeList[1].netList.list[0].protocol = NET_PROTOCOL_RDMA; - } - - int ret = UwalSyncNotify(nodeList); - free(nodeList); - return ret; + return uwalReplica->UwalRegisterMaster(); } uint64_t uwal_read_offset = 4; @@ -294,6 +252,14 @@ static int uwal_binlog_relay_is_uwal_relay(int &uwal_skip) return 0; } +static void ReleaseUwalReplica() +{ + if (uwalReplica != nullptr) { + delete uwalReplica; + uwalReplica = nullptr; + } +} + Binlog_relay_uwal_observer uwal_relay_observer = { sizeof(Binlog_relay_uwal_observer), // len @@ -307,7 +273,7 @@ static int uwal_replica_plugin_init(void *p) // Initialize error logging service. if (init_logging_service_for_plugin(®_srv, &log_bi, &log_bs)) return 1; - uwalReplicaObj = new UwalObject(); + uwalReplica = new UwalReplica(); // dlopen uwal here int ret = uwal_init_symbols(); @@ -315,22 +281,23 @@ static int uwal_replica_plugin_init(void *p) return 1; } - ret = uwalReplicaObj->UwalInit(); + ret = uwalReplica->UwalInit(); if (ret != 0) { + ReleaseUwalReplica(); return 1; } - uwalReplicaObj->SetMasterNode(false); - uwalReplicaObj->SetMasterInfo(); // notify - ret = UwalStandbyInitNotify(); + ret = uwalReplica->UwalInitNotify(); if (ret != 0) { LogPluginErr(ERROR_LEVEL, ER_UWAL_REPLICA_NOTIFY_FAILED, ret); + ReleaseUwalReplica(); return 1; } // register observer if (register_binlog_relay_uwal_observer(&uwal_relay_observer, p)) { + ReleaseUwalReplica(); return 1; } @@ -340,7 +307,7 @@ static int uwal_replica_plugin_init(void *p) static int uwal_replica_plugin_deinit(void *p) { // the plugin was not initialized, thre is nothing to do here - delete uwalReplicaObj; + ReleaseUwalReplica(); // unregister observer if (unregister_binlog_relay_uwal_observer(&uwal_relay_observer, p)) { diff --git a/plugin/uwal/uwal_source.cc b/plugin/uwal/uwal_source.cc index 7f6440244997d0156c09dff41df711bbaea575f0..9540fde01092f826ebdc8295e6edef975f711901 100644 --- a/plugin/uwal/uwal_source.cc +++ b/plugin/uwal/uwal_source.cc @@ -7,15 +7,6 @@ #include "sql/mysqld.h" #include "plugin/uwal/uwal_source.h" -bool uwal_enabled = false; -unsigned long uwal_disk_size = 0; -unsigned int uwal_id = 0; -unsigned int uwal_port = 9991; -char *uwal_ip = nullptr; -char *uwal_protocol = nullptr; -char *uwal_devices_path = nullptr; -char *uwal_log_path = nullptr; - unsigned int uwal_buffer_aligned_size = 262144; bool uwal_sync_append = false; @@ -23,135 +14,157 @@ unsigned int uwal_replica_id = 1; char *uwal_replica_ip = nullptr; char *uwal_replica_protocol = nullptr; -unsigned int uwal_master_id = 0; -char *uwal_master_ip = nullptr; -char *uwal_master_protocol = nullptr; - -void UwalObject::UwalValueInit() +int UwalSource::UwalSourceInitNotify() { - LogErr(INFORMATION_LEVEL, ER_UWAL_PATH, uwal_devices_path); - LogErr(INFORMATION_LEVEL, ER_UWAL_LOG_PATH, uwal_log_path); - LogErr(INFORMATION_LEVEL, ER_UWAL_DISK_SIZE, uwal_disk_size); - LogErr(INFORMATION_LEVEL, ER_UWAL_ID, uwal_id); - LogErr(INFORMATION_LEVEL, ER_UWAL_IP, uwal_ip); - LogErr(INFORMATION_LEVEL, ER_UWAL_PORT, uwal_port); - LogErr(INFORMATION_LEVEL, ER_UWAL_PROTOCOL, uwal_protocol); - - SetEnabled(uwal_enabled); - SetDiskSize(uwal_disk_size); - SetNodeId(uwal_id); - SetListenPort(uwal_port); - SetNodeIP(uwal_ip); - SetProtocol(uwal_protocol); - SetDevicesPath(uwal_devices_path); - SetLogPath(uwal_log_path); - - enableSyncAppend = uwal_sync_append; - alignedSize = uwal_buffer_aligned_size; + NodeStateInfo sourceStateInfo; + GetLocalStateInfo(&sourceStateInfo); + + NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo)); + nodeList->localNodeId = GetNodeId(); + nodeList->masterNodeId = GetNodeId(); + nodeList->nodeNum = 1; + nodeList->nodeList[0] = sourceStateInfo; + + int ret = UwalSyncNotify(nodeList); + free(nodeList); + return ret; } -int UwalObject::UwalInit() +int UwalSource::UwalWrite(uint64_t *outOffset, UwalCallBack *uwalCB, UwalNodeInfo *infos) { - UwalValueInit(); - int index = 0; - UwalInitConfig("ock.uwal.ip", node_ip, index++); - - char *listen_port_str = ConvertIntegerToString(listen_port, UWAL_PORT_LEN); - UwalInitConfig("ock.uwal.port", listen_port_str, index++); - UwalInitConfigProtocol(index++); - UwalInitConfig("ock.uwal.disk.poolid", UWAL_DISK_POOLID, index++); + UwalBuffer buffers[1] = {{uwalBuffer, bufferOffset}}; + UwalBufferList bufferList = {1, buffers}; + UwalAppendParam appendParam = {activeUwalId, &bufferList, uwalCB}; + int ret = ock_uwal_append_copy_free(&appendParam, outOffset, infos); + return ret; +} - char *disk_size_str = ConvertIntegerToString(disk_size, UWAL_INT_SIZE_LEN); - UwalInitConfig("ock.uwal.disk.size", disk_size_str, index++); - UwalInitConfig("ock.uwal.disk.min.block", "2147483648", index++); - UwalInitConfig("ock.uwal.disk.max.block", "2147483648", index++); +int UwalSource::UwalWriteSync() +{ + UwalNodeInfo *infos = (UwalNodeInfo *)malloc(sizeof(UwalNodeInfo) + MAX_NODE * sizeof(UwalNodeStatus)); + uint64_t offset = 0; - UwalInitConfig("ock.uwal.devices.path", devices_path, index++); - if (enableSyncAppend) { - UwalInitConfig("ock.uwal.rpc.worker.thread.num", "1", index++); - } else { - UwalInitConfig("ock.uwal.rpc.worker.thread.num", "4", index++); + int ret = UwalWrite(&offset, nullptr, infos); + if (ret != 0) { + LogErr(WARNING_LEVEL, ER_UWAL_SYNC_WRITE_FAILED, ret); } + RefreshBuffer(true); + free(infos); + return ret; +} - UwalInitConfig("ock.uwal.rpc.timeout", "30000", index++); - if (protocol == NET_PROTOCOL_RDMA) { - UwalInitConfig("ock.uwal.rpc.rndv.switch", "true", index++); - } +void UwalWriteAsyncCallBack(void *cbCtx, int retCode) +{ + UwalAsyncCbCtx *curCbCtx = (UwalAsyncCbCtx *)cbCtx; + uwalSource->IncreaseSyncIndex(); - UwalInitConfig("ock.uwal.rpc.compression.switch", "false", index++); - UwalInitConfig("ock.uwal.rpc.flowcontrol.switch", "false", index++); - UwalInitConfig("ock.uwal.rpc.flowcontrol.value", "128", index++); - UwalInitConfig("ock.uwal.devices.split.switch", "true", index++); - UwalInitConfig("ock.uwal.devices.split.size", "2147483648", index++); - UwalInitConfig("ock.uwal.devices.split.path", devices_path, index++); - UwalInitConfig("ock.uwal.devices.split.mysql.switch", "true", index++); - UwalInitConfig("ock.uwal.devices.split.mysql.prefix", "mysql-bin", index++); + free(curCbCtx->info); + free(curCbCtx); +} - int ret = ock_uwal_init(NULL, elem, index, log_path); +int UwalSource::UwalWriteAsync() +{ + UwalAsyncCbCtx *curCbCtx = (UwalAsyncCbCtx *)malloc(sizeof(UwalAsyncCbCtx)); + UwalNodeInfo *infos = (UwalNodeInfo *)malloc(sizeof(UwalNodeInfo) + MAX_NODE * sizeof(UwalNodeStatus)); + curCbCtx->info = infos; + UwalCallBack uwalCB = {UwalWriteAsyncCallBack, curCbCtx}; + int ret = UwalWrite(&(curCbCtx->outOffset), &uwalCB, infos); + if (ret != 0) { + // TODO: handle write error + LogErr(WARNING_LEVEL, ER_UWAL_SYNC_WRITE_FAILED, ret); + flushIndex -= 1; + } else { + flushIndex += 1; + } + RefreshBuffer(true); return ret; } -int UwalObject::UwalWrite(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos) +int UwalSource::UwalBinlogWrite(const unsigned char *buffer, my_off_t length) { - UwalBuffer uBuff; - UwalBufferList buffList; - uint64_t offset; - UwalAppendParam params; + int ret = 0; + RefreshBuffer(false); - uBuff.buf = buf; - uBuff.len = nBytes; - buffList.buffers = &uBuff; - buffList.cnt = 1; + if (bufferOffset + length < alignedSize) { + memcpy(uwalBuffer + bufferOffset, buffer, length); + bufferOffset += length; + return 0; + } - params.bufferList = &buffList; - params.uwalId = id; - params.cb = NULL; + // copy align + uint64_t tmpOffset = alignedSize - bufferOffset; + memcpy(uwalBuffer + bufferOffset, buffer, tmpOffset); + bufferOffset += tmpOffset; + ret = writer(); + + while ((length - tmpOffset) >= alignedSize) { + memcpy(uwalBuffer, buffer + tmpOffset, alignedSize); + bufferOffset = alignedSize; + tmpOffset += alignedSize; + ret = writer(); + } + if (length - tmpOffset > 0) { + memcpy(uwalBuffer, buffer + tmpOffset, length - tmpOffset); + bufferOffset = length - tmpOffset; + } - int ret = ock_uwal_append(¶ms, &offset, infos); return ret; } -int UwalObject::UwalWriteCopyFree(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos) +void UwalSource::RegisterBinlogWriter() { - UwalBuffer uBuff; - UwalBufferList buffList; - uint64_t offset; - UwalAppendParam params; - - uBuff.buf = buf; - uBuff.len = nBytes; - buffList.buffers = &uBuff; - buffList.cnt = 1; + if (enableSyncAppend) { + writer = std::bind(&UwalSource::UwalWriteSync, this); + } else { + writer = std::bind(&UwalSource::UwalWriteAsync, this); + } +} - params.bufferList = &buffList; - params.uwalId = id; - params.cb = NULL; +int UwalSource::UwalBinlogFlush() +{ + int ret = 0; + if (hasBuffer && bufferOffset > 0) { + ret = writer(); + } - int ret = ock_uwal_append_copy_free(¶ms, &offset, infos); + // update sync point + UpdateSyncPoint(); return ret; } -int UwalObject::UwalWriteAsyncCopyFree(UwalId *id, UwalCallBack *uwalCB, UwalNodeInfo *infos) +int UwalSource::UwalRotate(uint64_t timeLine) { - UwalBuffer buffers[1] = {{uwalBuffer, bufferOffset}}; - UwalBufferList bufferList = {1, buffers}; - UwalAppendParam appendParam = {id, &bufferList, uwalCB}; - int ret = ock_uwal_append_copy_free(&appendParam, &(((UwalAsyncCbCtx *)uwalCB->cbCtx)->outOffset), infos); + if (hasActiveUwal) { + // move curUwal to truncate list + truncateListLock.Lock(); + readyTruncateList.emplace_back(activeUwalId); + truncateListLock.Unlock(); + activeUwalId = nullptr; + hasActiveUwal = false; + } + truncateListLock.Lock(); + if (pendingDeleteList.size() > 3) { + while (!pendingDeleteList.empty()) { + UwalId *id = pendingDeleteList.front(); + DeleteUselessUwal(id); + pendingDeleteList.pop_front(); + free(id); + } + } + truncateListLock.Unlock(); + + int ret = UwalRenew(timeLine); return ret; } -int UwalObject::UwalCreate(uint64_t startOffset, uint64_t timeLine, uint64_t uwalSplitSize) +int UwalSource::UwalRenew(uint64_t timeLine) { if (hasActiveUwal) { LogErr(ERROR_LEVEL, ER_UWAL_STILL_IN_USE); return -1; } - UwalDurability dura; - dura.azCnt = 1; - dura.originNum = 1; - dura.redundancyNum = 0; - dura.reliabilityType = 1; + UwalDurability dura = {1, 1, 0, 1}; UwalAffinityPolicy affinity; affinity.partId = 0; @@ -163,9 +176,9 @@ int UwalObject::UwalCreate(uint64_t startOffset, uint64_t timeLine, uint64_t uwa desc.perfType = UWAL_PERF_TYPE_SSD; desc.stripe = UWAL_STRIPE_BUTT; desc.io = UWAL_IO_RANDOM; - desc.dataSize = uwalSplitSize; + desc.dataSize = UWAL_SIZE_2GB; desc.startTimeLine = timeLine; - desc.startWriteOffset = startOffset; + desc.startWriteOffset = 0; desc.durability = dura; desc.flags = UWAL_CREATE_DEGRADE_LOSSANY; desc.affinity = affinity; @@ -196,7 +209,7 @@ int UwalObject::UwalCreate(uint64_t startOffset, uint64_t timeLine, uint64_t uwa return ret; } -int UwalObject::DeleteUselessUwal(UwalId *id) +int UwalSource::DeleteUselessUwal(UwalId *id) { UwalDeleteParam param; param.uwalId = id; @@ -204,60 +217,108 @@ int UwalObject::DeleteUselessUwal(UwalId *id) return ock_uwal_delete(¶m); } -void UwalObject::GetLocalStateInfo(NodeStateInfo *nodeStateInfo) +int UwalSource::UwalBinlogSync() +{ + if (enableSyncAppend) { + return 0; + } + + uint64_t tmpSyncPoint = GetSyncPoint(); + uint64_t tmpSyncIndex = GetSyncIndex(); + + // wait for sync + while (tmpSyncIndex < tmpSyncPoint) { + usleep(50); + tmpSyncIndex = GetSyncIndex(); + } + return 0; +} + +int UwalSource::UwalRegisterReplica(uint32_t nodeId) { - nodeStateInfo->nodeId = GetNodeId(); - nodeStateInfo->state = NODE_STATE_UP; - nodeStateInfo->groupId = 0; - nodeStateInfo->groupLevel = 0; - - NetInfo netInfo; - netInfo.ipv4Addr = ock_uwal_ipv4_inet_to_int(node_ip); - netInfo.port = GetListenPort(); - netInfo.protocol = GetProtocol(); - - NetList netList; - netList.num = 1; - netList.list[0] = netInfo; - nodeStateInfo->netList = netList; + auto iter = registeredReplica.find(nodeId); + if (iter != registeredReplica.end()) { + LogErr(INFORMATION_LEVEL, ER_UWAL_MASTER_NOTIFY_REPEATEDLY); + return 0; + } + registeredReplica.insert(nodeId); + + LogPluginErr(SYSTEM_LEVEL, ER_UWAL_MASTER_START_NOTIFY, nodeId); + + NodeStateList *nodeList = (NodeStateList *)malloc( + sizeof(NodeStateList) + sizeof(NodeStateInfo) * (registeredReplica.size() + 1)); + nodeList->localNodeId = GetNodeId(); + nodeList->masterNodeId = GetNodeId(); + nodeList->nodeNum = registeredReplica.size() + 1; + + int count = 0; + for (const auto& replicaNodeId : registeredReplica) { + nodeList->nodeList[count].groupId = 1; + nodeList->nodeList[count].groupLevel = 1; + nodeList->nodeList[count].nodeId = replicaNodeId; + nodeList->nodeList[count].state = NODE_STATE_UP; + nodeList->nodeList[count].netList.num = 1; + nodeList->nodeList[count].netList.list[0].ipv4Addr = ock_uwal_ipv4_inet_to_int(replicaInfos[replicaNodeId].ip); + nodeList->nodeList[count].netList.list[0].port = GetPort(); + nodeList->nodeList[count].netList.list[0].protocol = NET_PROTOCOL_TCP; + if (replicaInfos[replicaNodeId].protocol == NET_PROTOCOL_RDMA && GetProtocol() == NET_PROTOCOL_RDMA) { + nodeList->nodeList[count].netList.list[0].protocol = NET_PROTOCOL_RDMA; + } + + count++; + } + + NodeStateInfo sourceStateInfo; + GetLocalStateInfo(&sourceStateInfo); + nodeList->nodeList[count] = sourceStateInfo; + int ret = UwalSyncNotify(nodeList); + free(nodeList); + return ret; } -void UwalNotifyCallback(void *ctx, int ret) +void UwalSource::UwalBackendThread() { - CBParams *cbParams = (CBParams *)ctx; - pthread_mutex_lock(&cbParams->mutex); - cbParams->cbResult = true; - cbParams->ret = ret; - pthread_mutex_unlock(&cbParams->mutex); - pthread_cond_signal(&cbParams->cond); + backendThreadStarted.store(true); + int count = 1; + bool hit = false; + while (!needStop) { + if (hit) { + count = count % 10 + 1; + hit = false; + } + + truncateListLock.Lock(); + if (readyTruncateList.size() > count) { + hit = true; + } + while (readyTruncateList.size() > count) { + pendingDeleteList.emplace_back(readyTruncateList.front()); + readyTruncateList.pop_front(); + } + truncateListLock.Unlock(); + sleep(1); + } } -int UwalSyncNotify(NodeStateList *nodeList) +void UwalSource::StartBackendThread() { - CBParams cbParams = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, false, 0}; - pthread_mutex_lock(&cbParams.mutex); - int ret = ock_uwal_notify_nodelist_change(nodeList, UwalNotifyCallback, (void *)&cbParams); - if (ret != 0) { - pthread_mutex_unlock(&cbParams.mutex); - return ret; + needStop = false; + std::thread tmpThread(&UwalSource::UwalBackendThread, this); + backendThread = std::move(tmpThread); + pthread_setname_np(backendThread.native_handle(), "uwalSourcebackend"); + + while (!backendThreadStarted.load()) { + usleep(10); } - uint16 count = 0; - while (!cbParams.cbResult) { - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += 3; - ts.tv_nsec = 0; - int condWaitRet = 0; - condWaitRet = pthread_cond_timedwait(&cbParams.cond, &cbParams.mutex, &ts); - if (condWaitRet == ETIMEDOUT) { - ++count; - LogErr(SYSTEM_LEVEL, ER_UWAL_NOTIFY_WAIT, count); - } +} + +void UwalSource::StopBackendThread() +{ + if (needStop == true || !backendThreadStarted.load()) { + return; } - pthread_mutex_unlock(&cbParams.mutex); - bool success = (cbParams.ret == 0); - if (success) { - LogErr(SYSTEM_LEVEL, ER_UWAL_NOTIFY_SUCCESS); + needStop = true; + if (backendThread.native_handle()) { + backendThread.join(); } - return success ? 0 : -1; } \ No newline at end of file diff --git a/plugin/uwal/uwal_source.h b/plugin/uwal/uwal_source.h index e8a1bb74ca2cb0b38b613750c178d552324133a7..0c2096b2e8912195983a7a3bb59ad0be103725c7 100644 --- a/plugin/uwal/uwal_source.h +++ b/plugin/uwal/uwal_source.h @@ -5,23 +5,7 @@ #ifndef __UWAL_SOURCE_H__ #define __UWAL_SOURCE_H__ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "mysql/plugin.h" -#include "sql/derror.h" // ER_THD -#include "mysqld_error.h" -#include "plugin/uwal/uwal_adaptor.h" - -#define UWAL_ELEM_NUM 100 +#include "plugin/uwal/uwal_common.h" constexpr uint32_t PAGE_ALIGN = 4096; constexpr uint32_t UWAL_MEM_SEG_COUNT = 256; @@ -31,15 +15,6 @@ constexpr uint64_t UWAL_SIZE_1MB = 1024 * UWAL_SIZE_1KB; constexpr uint64_t UWAL_SIZE_1GB = 1024 * UWAL_SIZE_1MB; constexpr uint64_t UWAL_SIZE_2GB = 2 * UWAL_SIZE_1GB; -extern bool uwal_enabled; -extern unsigned long uwal_disk_size; -extern unsigned int uwal_id; -extern unsigned int uwal_port; -extern char *uwal_ip; -extern char *uwal_protocol; -extern char *uwal_devices_path; -extern char *uwal_log_path; - extern unsigned int uwal_buffer_aligned_size; extern bool uwal_sync_append; @@ -47,9 +22,9 @@ extern unsigned int uwal_replica_id; extern char *uwal_replica_ip; extern char *uwal_replica_protocol; -extern unsigned int uwal_master_id; -extern char *uwal_master_ip; -extern char *uwal_master_protocol; +class UwalSource; + +extern UwalSource *uwalSource; typedef struct UwalAsyncCbCtx { uintptr_t cacheAddress; @@ -57,124 +32,49 @@ typedef struct UwalAsyncCbCtx { uint64_t outOffset; } UwalAsyncCbCtx; -typedef struct UwalObjNodeInfo { - uint32_t node_id; - char node_ip[UWAL_IP_LEN]; - NetProtocol protocol; -} UwalObjNodeInfo; - -typedef struct CBParams { - pthread_mutex_t mutex; - pthread_cond_t cond; - bool cbResult; - int ret; -} CBParams; +using BinlogWriteHandle = std::function; -class UwalSpinLock { +class UwalSource : public UwalBase { public: - UwalSpinLock() = default; - ~UwalSpinLock() = default; - - UwalSpinLock(const UwalSpinLock &) = delete; - UwalSpinLock &operator = (const UwalSpinLock &) = delete; - UwalSpinLock(UwalSpinLock &&) = delete; - UwalSpinLock &operator = (UwalSpinLock &&) = delete; - - inline void TryLock() + UwalSource() : backendThreadStarted(false) { - mFlag.test_and_set(std::memory_order_acquire); + alignedSize = uwal_buffer_aligned_size; + enableSyncAppend = uwal_sync_append; + + SetReplicaInfo(); + RegisterBinlogWriter(); } - inline void Lock() + inline bool HasActiveUwal() { - while (mFlag.test_and_set(std::memory_order_acquire)) { - } + return hasActiveUwal; } - inline void Unlock() + inline void IncreaseSyncIndex() { - mFlag.clear(std::memory_order_release); + syncIndexLock.Lock(); + syncIndex += 1; + syncIndexLock.Unlock(); } private: - std::atomic_flag mFlag = ATOMIC_FLAG_INIT; -}; - -class UwalObject { -private: - bool enabled = false; - uint64_t disk_size = 0; - uint32_t node_id = 0; - uint32_t listen_port = 0; - char *devices_path = nullptr; - char *log_path = nullptr; - NetProtocol protocol = NET_PROTOCOL_TCP; - UwalCfgElem elem[UWAL_ELEM_NUM]; - UwalBaseInfo info = {0}; - - bool masterNode = false; -public: - UwalObjNodeInfo replica_infos[MAX_NODE]; - UwalObjNodeInfo master_info; - char node_ip[UWAL_IP_LEN]; - std::set registered_replica; - uint32_t replica_num; - bool enableSyncAppend; - - UwalSpinLock appendFlushLock; - uint64_t appendFlushIndex; - - UwalSpinLock appendSyncLock; - uint64_t appendSyncIndex; - - UwalSpinLock syncPointLock; - uint64_t appendSyncPoint; - - char *uwalBuffer; - bool hasBuffer; - uint32_t bufferOffset; - uint32_t alignedSize; - - bool hasActiveUwal; - UwalSpinLock truncateListLock; - std::list readyTruncateList; - std::list pendingDeleteList; - UwalId *activeUwalId; - -public: - UwalObject() + void SetReplicaInfo() { - alignedSize = 262144; for (int i = 0; i < MAX_NODE; i++) { - replica_infos[i].node_id = NODE_ID_INVALID; - replica_infos[i].protocol = NET_PROTOCOL_TCP; + replicaInfos[i].id = NODE_ID_INVALID; + replicaInfos[i].protocol = NET_PROTOCOL_TCP; + } + + if (uwal_replica_ip == nullptr) { + return; + } + replicaInfos[uwal_replica_id].id = uwal_replica_id; + memcpy(replicaInfos[uwal_replica_id].ip, uwal_replica_ip, strlen(uwal_replica_ip) + 1); + if (!strcasecmp(UWAL_PROTOCOL_RDMA, uwal_replica_protocol)) { + replicaInfos[uwal_replica_id].protocol = NET_PROTOCOL_RDMA; } - enableSyncAppend = false; - - appendFlushIndex = 0; - appendSyncIndex = 0; - appendSyncPoint = 0; - uwalBuffer = nullptr; - bufferOffset = 0; - hasBuffer = false; - replica_num = 0; - - hasActiveUwal = false; - activeUwalId = nullptr; } - int UwalInit(); - - int UwalWrite(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos); - - int UwalWriteCopyFree(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos); - - int UwalWriteAsyncCopyFree(UwalId *id, UwalCallBack *uwalCB, UwalNodeInfo *infos); - - int UwalCreate(uint64_t startOffset, uint64_t timeLine, uint64_t uwalSplitSize); - - int DeleteUselessUwal(UwalId *id); - inline void RefreshBuffer(bool force) { if (force || !hasBuffer) { @@ -185,140 +85,92 @@ public: } } - void SetMasterNode(bool value) + inline uint64_t GetSyncIndex() { - masterNode = value; + syncIndexLock.Lock(); + uint64_t tmpSyncIndex = syncIndex; + syncIndexLock.Unlock(); + return tmpSyncIndex; } - void SetEnabled(bool value) + inline void UpdateSyncPoint() { - enabled = value; + syncPointLock.Lock(); + syncPoint = flushIndex; + syncPointLock.Unlock(); } - void SetDiskSize(uint64_t value) + inline uint64_t GetSyncPoint() { - disk_size = value; + syncPointLock.Lock(); + uint64_t tmpSyncPoint = syncPoint; + syncPointLock.Unlock(); + return tmpSyncPoint; } - void SetNodeId(uint32_t value) - { - node_id = value; - } + int UwalWrite(uint64_t *outOffset, UwalCallBack *uwalCB, UwalNodeInfo *infos); - uint32_t GetNodeId() - { - return node_id; - } + void RegisterBinlogWriter(); - void SetListenPort(uint32_t value) - { - listen_port = value; - } + int UwalRenew(uint64_t timeLine); - uint32_t GetListenPort() - { - return listen_port; - } + int DeleteUselessUwal(UwalId *id); - void SetNodeIP(char *value) - { - memcpy(node_ip, value, strlen(value) + 1); - } +public: - void SetDevicesPath(char *value) - { - devices_path = value; - } + int UwalSourceInitNotify(); - void SetLogPath(char *value) - { - log_path = value; - } + int UwalWriteSync(); - void SetProtocol(char *value) - { - if (!strcasecmp(UWAL_PROTOCOL_RDMA, value)) { - protocol = NET_PROTOCOL_RDMA; - } - } + int UwalWriteAsync(); - NetProtocol GetProtocol() - { - return protocol; - } + int UwalBinlogWrite(const unsigned char *buffer, my_off_t length); - void SetReplicaInfo() - { - if (uwal_replica_ip == nullptr) { - return; - } - replica_infos[uwal_replica_id].node_id = uwal_replica_id; - memcpy(replica_infos[uwal_replica_id].node_ip, uwal_replica_ip, strlen(uwal_replica_ip) + 1); - if (!strcasecmp(UWAL_PROTOCOL_RDMA, uwal_replica_protocol)) { - replica_infos[uwal_replica_id].protocol = NET_PROTOCOL_RDMA; - } - } + int UwalBinlogFlush(); - void SetMasterInfo() - { - if (uwal_master_ip == nullptr) { - return; - } - master_info.node_id = uwal_master_id; - memcpy(master_info.node_ip, uwal_master_ip, strlen(uwal_master_ip) + 1); - if (!strcasecmp(UWAL_PROTOCOL_RDMA, uwal_master_protocol)) { - master_info.protocol = NET_PROTOCOL_RDMA; - } - } + int UwalRotate(uint64_t timeLine); - void UwalInitConfig(char *name, char *value, int index) - { - elem[index].substr = name; - elem[index].value = value; - } + int UwalBinlogSync(); - char *ConvertIntegerToString(int value, int length) - { - char *buffer = new char[length]; - sprintf(buffer, "%d\0", value); - return buffer; - } + int UwalRegisterReplica(uint32_t nodeId); - char *ConvertIntegerToString(uint32_t value, int length) - { - char *buffer = new char[length]; - sprintf(buffer, "%lu\0", value); - return buffer; - } + void UwalBackendThread(); - char *ConvertIntegerToString(uint64_t value, int length) - { - char *buffer = new char[length]; - sprintf(buffer, "%llu\0", value); - return buffer; - } + void StartBackendThread(); - void UwalInitConfigProtocol(int index) - { - switch(protocol) { - case NET_PROTOCOL_TCP: - UwalInitConfig("ock.uwal.protocol", UWAL_PROTOCOL_TCP, index); - break; - case NET_PROTOCOL_RDMA: - UwalInitConfig("ock.uwal.protocol", UWAL_PROTOCOL_RDMA, index); - break; - default: - break; - } - } + void StopBackendThread(); - void UwalValueInit(); +private: + std::thread backendThread; + std::atomic_bool backendThreadStarted; + bool needStop = false; - void GetLocalStateInfo(NodeStateInfo *nodeStateInfo); -}; + bool enableSyncAppend = false; + bool hasActiveUwal = false; -void UwalNotifyCallback(void *ctx, int ret); + UwalBaseInfo info = {0}; + UwalObjNodeInfo replicaInfos[MAX_NODE]; + + UwalSpinLock syncIndexLock; + uint64_t syncIndex = 0; + + UwalSpinLock syncPointLock; + uint64_t syncPoint = 0; + + uint64_t flushIndex = 0; + + std::set registeredReplica; -int UwalSyncNotify(NodeStateList *nodeList); + char *uwalBuffer = nullptr; + bool hasBuffer = false; + uint32_t bufferOffset = 0; + uint32_t alignedSize = 0; + + UwalSpinLock truncateListLock; + std::list readyTruncateList; + std::list pendingDeleteList; + UwalId *activeUwalId = nullptr; + BinlogWriteHandle writer; +}; #endif \ No newline at end of file diff --git a/plugin/uwal/uwal_source_plugin.cc b/plugin/uwal/uwal_source_plugin.cc index 13656de424f40fce623d41d624ca0f4ddd9d3482..dc9a95fc6118b9d1aa6f9e18f0dce81e10487df7 100644 --- a/plugin/uwal/uwal_source_plugin.cc +++ b/plugin/uwal/uwal_source_plugin.cc @@ -23,8 +23,8 @@ static SERVICE_TYPE(registry) *reg_srv = nullptr; SERVICE_TYPE(log_builtins) *log_bi = nullptr; SERVICE_TYPE(log_builtins_string) *log_bs = nullptr; -UwalObject *uwalObj = nullptr; -pthread_t uwalBackendThread; +UwalSource *uwalSource = nullptr; + bool stopBackend = false; static MYSQL_SYSVAR_BOOL( @@ -157,298 +157,53 @@ static SYS_VAR *uwal_master_system_vars[] = { nullptr, }; -int UwalPrimaryInitNotify() -{ - NodeStateInfo primaryStateInfo; - uwalObj->GetLocalStateInfo(&primaryStateInfo); - - NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo)); - nodeList->localNodeId = uwalObj->GetNodeId(); - nodeList->masterNodeId = uwalObj->GetNodeId(); - nodeList->nodeNum = 1; - nodeList->nodeList[0] = primaryStateInfo; - - int ret = UwalSyncNotify(nodeList); - free(nodeList); - return ret; -} - static int uwal_binlog_after_open(const char *binlog_name) { - if (uwalObj->hasActiveUwal) { - // move curUwal to truncate list - uwalObj->truncateListLock.Lock(); - uwalObj->readyTruncateList.emplace_back(uwalObj->activeUwalId); - uwalObj->truncateListLock.Unlock(); - uwalObj->activeUwalId = nullptr; - uwalObj->hasActiveUwal = false; - } - uwalObj->truncateListLock.Lock(); - if (uwalObj->pendingDeleteList.size() > 3) { - while (!uwalObj->pendingDeleteList.empty()) { - UwalId *id = uwalObj->pendingDeleteList.front(); - uwalObj->DeleteUselessUwal(id); - uwalObj->pendingDeleteList.pop_front(); - free(id); - } - } - uwalObj->truncateListLock.Unlock(); char *name = (char *)binlog_name; char *extPos = strrchr(name, FN_EXTCHAR); uint32_t logIndex = strtoul(extPos + 1, NULL, 10); - int ret = uwalObj->UwalCreate(0, logIndex, UWAL_SIZE_2GB); - return ret; -} - -static int uwal_write_sync() -{ - UwalNodeInfo *infos = (UwalNodeInfo *)malloc(sizeof(UwalNodeInfo) + MAX_NODE * sizeof(UwalNodeStatus)); - - int ret = uwalObj->UwalWriteCopyFree(uwalObj->activeUwalId, uwalObj->bufferOffset, uwalObj->uwalBuffer, infos); - if (ret != 0) { - LogPluginErr(WARNING_LEVEL, ER_UWAL_SYNC_WRITE_FAILED, ret); - } - uwalObj->RefreshBuffer(true); - free(infos); - return ret; -} - -static int uwal_binlog_write_sync(const unsigned char *buffer, my_off_t length) -{ - int ret = 0; - uwalObj->RefreshBuffer(false); - - if (uwalObj->bufferOffset + length < uwalObj->alignedSize) { - memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, length); - uwalObj->bufferOffset += length; - return 0; - } - - // copy align - uint64_t tmpOffset = uwalObj->alignedSize - uwalObj->bufferOffset; - memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, tmpOffset); - uwalObj->bufferOffset += tmpOffset; - - ret = uwal_write_sync(); - - while ((length - tmpOffset) >= uwalObj->alignedSize) { - memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, uwalObj->alignedSize); - uwalObj->bufferOffset = uwalObj->alignedSize; - tmpOffset += uwalObj->alignedSize; - ret = uwal_write_sync(); - } - if (length - tmpOffset > 0) { - memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, length - tmpOffset); - uwalObj->bufferOffset = length - tmpOffset; - } - - return ret; -} - -void UwalWriteAsyncCallBack(void *cbCtx, int retCode) -{ - UwalAsyncCbCtx *curCbCtx = (UwalAsyncCbCtx *)cbCtx; - uwalObj->appendSyncLock.Lock(); - uwalObj->appendSyncIndex += 1; - uwalObj->appendSyncLock.Unlock(); - - free(curCbCtx->info); - free(curCbCtx); -} - -static int uwal_write_async() -{ - UwalAsyncCbCtx *curCbCtx = (UwalAsyncCbCtx *)malloc(sizeof(UwalAsyncCbCtx)); - UwalNodeInfo *infos = (UwalNodeInfo *)malloc(sizeof(UwalNodeInfo) + MAX_NODE * sizeof(UwalNodeStatus)); - curCbCtx->info = infos; - UwalCallBack uwalCB = {UwalWriteAsyncCallBack, curCbCtx}; - - uwalObj->appendFlushIndex += 1; - int ret = uwalObj->UwalWriteAsyncCopyFree(uwalObj->activeUwalId, &uwalCB, infos); - if (ret != 0) { - LogPluginErr(WARNING_LEVEL, ER_UWAL_SYNC_WRITE_FAILED, ret); - uwalObj->appendFlushIndex -= 1; - } - uwalObj->RefreshBuffer(true); - return ret; -} - -static int uwal_binlog_write_async(const unsigned char *buffer, my_off_t length) -{ - int ret = 0; - uwalObj->RefreshBuffer(false); - - if (uwalObj->bufferOffset + length < uwalObj->alignedSize) { - memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, length); - uwalObj->bufferOffset += length; - return 0; - } - - // copy align - uint64_t tmpOffset = uwalObj->alignedSize - uwalObj->bufferOffset; - memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, tmpOffset); - uwalObj->bufferOffset += tmpOffset; - ret = uwal_write_async(); - - while ((length - tmpOffset) >= uwalObj->alignedSize) { - memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, uwalObj->alignedSize); - uwalObj->bufferOffset = uwalObj->alignedSize; - tmpOffset += uwalObj->alignedSize; - ret = uwal_write_async(); - } - if (length - tmpOffset > 0) { - memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, length - tmpOffset); - uwalObj->bufferOffset = length - tmpOffset; - } - + int ret = uwalSource->UwalRotate(logIndex); return ret; } static int uwal_binlog_write(const unsigned char *buffer, my_off_t length, my_off_t position) { - if (!uwalObj->hasActiveUwal) { + if (!uwalSource->HasActiveUwal()) { LogPluginErr(WARNING_LEVEL, ER_UWAL_NOT_SUPPORT); return 0; } - if (uwalObj->enableSyncAppend) { - return uwal_binlog_write_sync(buffer, length); - } - - return uwal_binlog_write_async(buffer, length); -} - -static int uwal_binlog_flush_sync() -{ - int ret = 0; - if (uwalObj->hasBuffer && uwalObj->bufferOffset > 0) { - ret = uwal_write_sync(); - } - - return ret; -} - -static int uwal_binlog_flush_async() -{ - int ret = 0; - if (uwalObj->hasBuffer && uwalObj->bufferOffset > 0) { - ret = uwal_write_async(); - } - - // update sync point - uwalObj->syncPointLock.Lock(); - uwalObj->appendSyncPoint = uwalObj->appendFlushIndex; - uwalObj->syncPointLock.Unlock(); - - return ret; + return uwalSource->UwalBinlogWrite(buffer, length); } static int uwal_binlog_flush() { - if (!uwalObj->hasActiveUwal) { + if (!uwalSource->HasActiveUwal()) { // enable uwal in RUN-time, IO cache contains unflushed buffer LogPluginErr(WARNING_LEVEL, ER_UWAL_NOT_SUPPORT); return 0; } - if (uwalObj->enableSyncAppend) { - return uwal_binlog_flush_sync(); - } - - return uwal_binlog_flush_async(); + return uwalSource->UwalBinlogFlush(); } static int uwal_binlog_after_register(uint32 server_id) { - LogPluginErr(SYSTEM_LEVEL, ER_UWAL_MASTER_START_NOTIFY, server_id - 1); - auto iter = uwalObj->registered_replica.find(server_id - 1); - if (iter != uwalObj->registered_replica.end()) { - LogPluginErr(INFORMATION_LEVEL, ER_UWAL_MASTER_NOTIFY_REPEATEDLY); - return 0; - } - uwalObj->registered_replica.insert(server_id - 1); - NodeStateList *nodeList = (NodeStateList *)malloc( - sizeof(NodeStateList) + sizeof(NodeStateInfo) * (uwalObj->registered_replica.size() + 1)); - nodeList->localNodeId = uwalObj->GetNodeId(); - nodeList->masterNodeId = uwalObj->GetNodeId(); - nodeList->nodeNum = uwalObj->registered_replica.size() + 1; - - int count = 0; - for (const auto& replicaNodeId : uwalObj->registered_replica) { - nodeList->nodeList[count].groupId = 1; - nodeList->nodeList[count].groupLevel = 1; - nodeList->nodeList[count].nodeId = replicaNodeId; - nodeList->nodeList[count].state = NODE_STATE_UP; - nodeList->nodeList[count].netList.num = 1; - nodeList->nodeList[count].netList.list[0].ipv4Addr = - ock_uwal_ipv4_inet_to_int(uwalObj->replica_infos[replicaNodeId].node_ip); - nodeList->nodeList[count].netList.list[0].protocol = NET_PROTOCOL_TCP; - if (uwalObj->replica_infos[replicaNodeId].protocol == NET_PROTOCOL_RDMA - && uwalObj->GetProtocol() == NET_PROTOCOL_RDMA) { - nodeList->nodeList[count].netList.list[0].protocol = NET_PROTOCOL_RDMA; - } - nodeList->nodeList[count].netList.list[0].port = uwalObj->GetListenPort(); - count++; - } - NodeStateInfo primaryStateInfo; - uwalObj->GetLocalStateInfo(&primaryStateInfo); - nodeList->nodeList[count] = primaryStateInfo; - int ret = UwalSyncNotify(nodeList); - uwalObj->replica_num = count; - free(nodeList); - return ret; + return uwalSource->UwalRegisterReplica(server_id - 1); } static int uwal_binlog_after_sync() { - // check uwal status here - - if (uwalObj->enableSyncAppend) { - return 0; - } - - // get current sync point - uwalObj->syncPointLock.Lock(); - uint64_t tmpSyncPoint = uwalObj->appendSyncPoint; - uwalObj->syncPointLock.Unlock(); - - // wait for sync - uwalObj->appendSyncLock.Lock(); - uint64_t tmpSyncIndex = uwalObj->appendSyncIndex; - uwalObj->appendSyncLock.Unlock(); - // overflow? - - while (tmpSyncIndex < tmpSyncPoint) { - usleep(50); - uwalObj->appendSyncLock.Lock(); - tmpSyncIndex = uwalObj->appendSyncIndex; - uwalObj->appendSyncLock.Unlock(); - } - - return 0; + return uwalSource->UwalBinlogSync(); } -void *UwalBackendThread(void *param) +static void ReleaseUwalSource() { - int count = 1; - bool hit = false; - while (!stopBackend) { - if (hit) { - count = count % 10 + 1; - hit = false; - } - - uwalObj->truncateListLock.Lock(); - if (uwalObj->readyTruncateList.size() > count) { - hit = true; - } - while (uwalObj->readyTruncateList.size() > count) { - uwalObj->pendingDeleteList.emplace_back(uwalObj->readyTruncateList.front()); - uwalObj->readyTruncateList.pop_front(); - } - uwalObj->truncateListLock.Unlock(); - sleep(1); + if (uwalSource != nullptr) { + uwalSource->StopBackendThread(); + delete uwalSource; + uwalSource = nullptr; } } @@ -467,46 +222,42 @@ static int uwal_master_plugin_init(void *p) // Initialize error logging service. if (init_logging_service_for_plugin(®_srv, &log_bi, &log_bs)) return 1; - uwalObj = new UwalObject(); - // dlopen uwal here int ret = uwal_init_symbols(); if (ret != 0) { return 1; } - ret = uwalObj->UwalInit(); + uwalSource = new UwalSource(); + ret = uwalSource->UwalInit(); if (ret != 0) { + ReleaseUwalSource(); return 1; } - uwalObj->SetMasterNode(true); - uwalObj->SetReplicaInfo(); + + uwalSource->StartBackendThread(); // notify - ret = UwalPrimaryInitNotify(); + ret = uwalSource->UwalSourceInitNotify(); if (ret != 0) { LogPluginErr(ERROR_LEVEL, ER_UWAL_PRIMARY_NOTIFY_FAILED, ret); + ReleaseUwalSource(); return 1; } // register observer if (register_binlog_uwal_observer(&uwal_observer, p)) { + ReleaseUwalSource(); return 1; } - // start backend thread - if (pthread_create(&uwalBackendThread, NULL, UwalBackendThread, NULL) != 0) { - return 1; - } - pthread_setname_np(uwalBackendThread, "uwalbak"); - return 0; } static int uwal_master_plugin_deinit(void *p) { // the plugin was not initialized, thre is nothing to do here - delete uwalObj; + ReleaseUwalSource(); // unregister observer if (unregister_binlog_uwal_observer(&uwal_observer, p)) {