From 264e1f01c9b464dcfaedc55ea898efc9491221d2 Mon Sep 17 00:00:00 2001 From: lengmengchao Date: Mon, 7 Nov 2022 15:58:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E9=80=81930=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 99 + NOTICE.txt | 9 + build.sh | 58 + package/globalcache-adaptorlib_pack.sh | 39 + patch/README.md | 1 + patch/ceph-global-cache-tls.patch | 724 +++ patch/ceph-global-cache.patch | 5839 +++++++++++++++++++ patch/globalcache-ceph-adaptor-spec.patch | 83 + src/CMakeLists.txt | 26 + src/ceph_proxy/CMakeLists.txt | 116 + src/ceph_proxy/CcmAdaptor.cc | 54 + src/ceph_proxy/CcmAdaptor.h | 19 + src/ceph_proxy/CephExport.h | 41 + src/ceph_proxy/CephProxy.cc | 298 + src/ceph_proxy/CephProxy.h | 86 + src/ceph_proxy/CephProxyFtds.cc | 51 + src/ceph_proxy/CephProxyFtds.h | 89 + src/ceph_proxy/CephProxyInterface.cc | 381 ++ src/ceph_proxy/CephProxyInterface.h | 701 +++ src/ceph_proxy/CephProxyLog.h | 128 + src/ceph_proxy/CephProxyOp.cc | 31 + src/ceph_proxy/CephProxyOp.h | 208 + src/ceph_proxy/ConfigRead.cc | 189 + src/ceph_proxy/ConfigRead.h | 50 + src/ceph_proxy/Gcbufferlist.h | 21 + src/ceph_proxy/PoolContext.h | 127 + src/ceph_proxy/RadosMonitor.cc | 588 ++ src/ceph_proxy/RadosMonitor.h | 97 + src/ceph_proxy/RadosWorker.cc | 127 + src/ceph_proxy/RadosWorker.h | 338 ++ src/ceph_proxy/RadosWrapper.cc | 1273 ++++ src/ceph_proxy/RadosWrapper.h | 146 + src/ceph_proxy/RbdWrapper.cc | 862 +++ src/dependency/include/sa_def.h | 213 + src/dependency/include/sa_export.h | 57 + src/server_adaptor/CMakeLists.txt | 93 + src/server_adaptor/ClassHandler.cc | 339 ++ src/server_adaptor/ClassHandler.h | 131 + src/server_adaptor/class_api.cc | 782 +++ src/server_adaptor/client_op_queue.h | 45 + src/server_adaptor/config_read.cpp | 206 + src/server_adaptor/config_read.h | 50 + src/server_adaptor/msg_module.cpp | 281 + src/server_adaptor/msg_module.h | 34 + src/server_adaptor/msg_perf_record.cc | 103 + src/server_adaptor/msg_perf_record.h | 80 + src/server_adaptor/network_module.cpp | 1320 +++++ src/server_adaptor/network_module.h | 217 + src/server_adaptor/objclass.h | 168 + src/server_adaptor/osa.cpp | 440 ++ src/server_adaptor/osa.h | 44 + src/server_adaptor/sa_ftds_osa.h | 34 + src/server_adaptor/sa_server_dispatcher.cpp | 69 + src/server_adaptor/sa_server_dispatcher.h | 59 + src/server_adaptor/salog.cpp | 118 + src/server_adaptor/salog.h | 78 + 56 files changed, 17860 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 NOTICE.txt create mode 100644 build.sh create mode 100644 package/globalcache-adaptorlib_pack.sh create mode 100644 patch/README.md create mode 100644 patch/ceph-global-cache-tls.patch create mode 100644 patch/ceph-global-cache.patch create mode 100644 patch/globalcache-ceph-adaptor-spec.patch create mode 100644 src/CMakeLists.txt create mode 100644 src/ceph_proxy/CMakeLists.txt create mode 100644 src/ceph_proxy/CcmAdaptor.cc create mode 100644 src/ceph_proxy/CcmAdaptor.h create mode 100644 src/ceph_proxy/CephExport.h create mode 100644 src/ceph_proxy/CephProxy.cc create mode 100644 src/ceph_proxy/CephProxy.h create mode 100644 src/ceph_proxy/CephProxyFtds.cc create mode 100644 src/ceph_proxy/CephProxyFtds.h create mode 100644 src/ceph_proxy/CephProxyInterface.cc create mode 100644 src/ceph_proxy/CephProxyInterface.h create mode 100644 src/ceph_proxy/CephProxyLog.h create mode 100644 src/ceph_proxy/CephProxyOp.cc create mode 100644 src/ceph_proxy/CephProxyOp.h create mode 100644 src/ceph_proxy/ConfigRead.cc create mode 100644 src/ceph_proxy/ConfigRead.h create mode 100644 src/ceph_proxy/Gcbufferlist.h create mode 100644 src/ceph_proxy/PoolContext.h create mode 100644 src/ceph_proxy/RadosMonitor.cc create mode 100644 src/ceph_proxy/RadosMonitor.h create mode 100644 src/ceph_proxy/RadosWorker.cc create mode 100644 src/ceph_proxy/RadosWorker.h create mode 100644 src/ceph_proxy/RadosWrapper.cc create mode 100644 src/ceph_proxy/RadosWrapper.h create mode 100644 src/ceph_proxy/RbdWrapper.cc create mode 100644 src/dependency/include/sa_def.h create mode 100644 src/dependency/include/sa_export.h create mode 100644 src/server_adaptor/CMakeLists.txt create mode 100644 src/server_adaptor/ClassHandler.cc create mode 100644 src/server_adaptor/ClassHandler.h create mode 100644 src/server_adaptor/class_api.cc create mode 100644 src/server_adaptor/client_op_queue.h create mode 100644 src/server_adaptor/config_read.cpp create mode 100644 src/server_adaptor/config_read.h create mode 100644 src/server_adaptor/msg_module.cpp create mode 100644 src/server_adaptor/msg_module.h create mode 100644 src/server_adaptor/msg_perf_record.cc create mode 100644 src/server_adaptor/msg_perf_record.h create mode 100644 src/server_adaptor/network_module.cpp create mode 100644 src/server_adaptor/network_module.h create mode 100644 src/server_adaptor/objclass.h create mode 100644 src/server_adaptor/osa.cpp create mode 100644 src/server_adaptor/osa.h create mode 100644 src/server_adaptor/sa_ftds_osa.h create mode 100644 src/server_adaptor/sa_server_dispatcher.cpp create mode 100644 src/server_adaptor/sa_server_dispatcher.h create mode 100644 src/server_adaptor/salog.cpp create mode 100644 src/server_adaptor/salog.h diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..97cac18 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,99 @@ +cmake_minimum_required(VERSION 3.14.1) +project(ADAPTOR) + +set(BASE_DIR_UP ${CMAKE_CURRENT_SOURCE_DIR}/..) +set(BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR}) +if (NOT DEFINED CEPH_DIR) + set(CEPH_DIR ${PROJECT_SOURCE_DIR}/../../ceph-14.2.8) +endif() +if (NOT DEFINED GLOBAL_CACHE_DIR) + set(GLOBAL_CACHE_DIR ${PROJECT_SOURCE_DIR}/../global_cache) +endif() +set(GLOBAL_CACHE_INFRAS_LIB_DIR ${GLOBAL_CACHE_DIR}/lib) +message(STATUS "CEPH_DIR: " ${CEPH_DIR}) +message(STATUS "GLOBAL_CACHE_DIR: " ${GLOBAL_CACHE_DIR}) +message(STATUS "GLOBAL_CACHE_INFRAS_LIB_DIR: " ${GLOBAL_CACHE_INFRAS_LIB_DIR}) + +include_directories( + ${CEPH_DIR}/src/ + ${CEPH_DIR}/src/boost/ + ${CEPH_DIR}/src/include/ + ${CEPH_DIR}/build/include/ + ${CEPH_DIR}/build/src/include/ + ${BASE_DIR}/src/dependency/include/ +) +get_property(dirs DIRECTORY ${CMAKE_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES) +message(STATUS "adaptor include directory = ${dirs}") + +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE DEBUG) +endif() +message(STATUS "CMAKE_BUILD_TYPE: ${CMAKE_BUILD_TYPE}") + +if(NOT DEFINED USE_ASAN) + set(USE_ASAN False) +endif() +message(STATUS "USE_ASAN: ${USE_ASAN}") + +if (${CMAKE_BUILD_TYPE} STREQUAL "RELEASE") + set(COMPILE_FLAGS "-Wl,-z,relro,-z,now -s -fstack-protector-all -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D__linux__ -Wall -Wtype-limits -Wignored-qualifiers -Winit-self -Wpointer-arith -fno-strict-aliasing -fsigned-char -Wno-unknown-pragmas -rdynamic -ftemplate-depth-1024 -Wnon-virtual-dtor -Wno-unknown-pragmas -Wno-ignored-qualifiers -Wstrict-null-sentinel -Woverloaded-virtual -fno-new-ttp-matching -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=2 -fstack-protector-strong -fdiagnostics-color=auto -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -O2 -DNDEBUG -fPIE -DHAVE_CONFIG_H -D__CEPH__ -D_REENTRANT -D_THREAD_SAFE -D__STDC_FORMAT_MACROS -std=c++1z -ldl -lrt -lresolv -lpthread") +else() + set(COMPILE_FLAGS "-Wl,-z,relro,-z,now -ftrapv -fstack-protector-all -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -D__linux__ -Wall -Wtype-limits -Wignored-qualifiers -Winit-self -Wpointer-arith -fno-strict-aliasing -fsigned-char -Wno-unknown-pragmas -rdynamic -ftemplate-depth-1024 -Wnon-virtual-dtor -Wno-ignored-qualifiers -Wstrict-null-sentinel -Woverloaded-virtual -fno-new-ttp-matching -DCEPH_DEBUG_MUTEX -fstack-protector-strong -fdiagnostics-color=auto -fno-builtin-malloc -fno-builtin-calloc -fno-builtin-realloc -fno-builtin-free -g -O0 -fPIE -DHAVE_CONFIG_H -D__CEPH__ -D_REENTRANT -D_THREAD_SAFE -D__STDC_FORMAT_MACROS -std=c++1z ") +endif() +set(CMAKE_CXX_FLAGS "${COMPILE_FLAGS}") + +if(NOT DEFINED UT) + set(UT False) +endif() +message(STATUS "UT: ${UT}") + +if(NOT DEFINED CLASS_PATH) + set(CLASS_PATH "/opt/gcache/lib") +endif() +message(STATUS "CLASS_PATH: ${CLASS_PATH}") + +if(NOT DEFINED CPU_TYPE) + set(CPU_TYPE "arm_64") +endif() +message(STATUS "CPU_TYPE: ${CPU_TYPE}") + +# +set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${BASE_DIR}/build/lib)#.a +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${BASE_DIR}/build/lib)#.so + +set(GLOBAL_CACHE_ADAPTOR_LIBS sa) + +set(PROXY_LIBS proxy) +#set(PROXY_LIBS ceph_proxy_stub) + +set(CCM_LIBS -lccm_lib) +set(PERSISTENCE_LIBS -lstream -lbdm -lplog) +set(VERBS_LIBS ) +set(COMMON_LIBS -lpthread -ldl -lm) +set(OPENSSL_LIBS -lzookeeper_mt) +set(INFRASTRUCTURE_LIBS -ldplog -ldpumm_mm -ldpumm_cmm -ldposen -ldposax -lmxml -ldpdiagnose -lftdsclient -losax_util) +set(PLOG_SDK_LIBS -ldplog -ldpumm_cmm -ldpumm_mm -ldposax -lmxml -ldpdiagnose -lftdsclient -losax_util) +set(INDEX_LIBS -lart -lart_repair -lkvs -lmessage -lpelagodb) +set(INDEX_STUB_LIBS -lindex_ccdb_client_stub -lkvs_ctrl_stub) + +set(CACHE_LIBS -lcache) +set(CACHE_DEP_LIBS + -ldl -liod -ldpdiagnose -lsecurec -lpatmatch -lftdsclient -ldposax -losax_util + -ldpumm_cmm -ldpumm_mm -ldptracepoint -llwt -ldplog -ldif -lmxml -lupf +) +set(EXTRA_LIBS -laio -lstdc++ -lupf -ldif -llz4 -llwt -lscpart_mgr) +set(CLI_LIBS -lcli_server_usr) +set(CONF_LIBS -lconfparser) +if(${UT}) + set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${BASE_DIR}/build/lib) + set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${BASE_DIR}/build/lib) + set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${BASE_DIR}/build/ut/cache) + add_subdirectory(test) +endif() + +message(STATUS "BASE DIRECTORY: ${BASE_DIR}") +message(STATUS "LIBRARY DIRECTORY: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}") + +# +add_subdirectory(src) + diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 0000000..608f71d --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,9 @@ +This software contains files from the open source project ceph 14.2.8, most of which is dual licensed under the LGPL version 2.1. +Name: ceph +Maintainer: Sage Weil +Source: http://ceph.com/ + +Files: src/server_adaptor/ClassHandler.h; src/server_adaptor/ClassHandler.cc; src/server_adaptor/class_api.cc; +src/server_adaptor/objclass.h +Copyright: (c) 2004-2010 by Sage Weil +License: LGPL-2.1 or LGPL-3 (see COPYING-LGPL2.1) diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..f6c294b --- /dev/null +++ b/build.sh @@ -0,0 +1,58 @@ +#!/bin/bash +# Copyright © Huawei Technologies Co., Ltd. 2021-2021. All rights reserved. +# Description: The script of building global_cache_adaptor +set -ex +set -o pipefail +debug=$1 +FILE_PATH="$(readlink -f $(dirname $0))" +BUILD_DIR="${FILE_PATH}/build" +ROOT_DIR=${FILE_PATH} +CMAKE_ROOT_DIR=${ROOT_DIR} +TEST_BIN_DIR="${FILE_PATH}/test/bin" + +cpu_type=$(uname -m) + +main() +{ + rm -rf ${BUILD_DIR} + mkdir -p ${BUILD_DIR} + rm -rf ${TEST_BIN_DIR} + + cd ${BUILD_DIR} + if type cmake3 > /dev/null 2>&1 ; then + CMAKE=cmake3 + echo "Using cmake3." + else + CMAKE=cmake + echo "Using cmake." + fi + if [ "$debug" = "DEBUG" ] ; then + echo "Build global_cache_adaptor DEBUG." + if [ "${cpu_type}" = "aarch64" ] ; then + ${CMAKE} ${CMAKE_ROOT_DIR} -DCMAKE_SKIP_RPATH=true + else + ${CMAKE} ${CMAKE_ROOT_DIR} -DCMAKE_SKIP_RPATH=true -DPROXY_ONLY=true -DCPU_TYPE=${cpu_type} + fi + elif [ "$debug" = "ASAN" ] ; then + echo "Build global_cache_adaptor DEBUG with ASAN." + if [ "${cpu_type}" = "aarch64" ] ; then + ${CMAKE} ${CMAKE_ROOT_DIR} -DCMAKE_SKIP_RPATH=true -DUSE_ASAN=True + else + ${CMAKE} ${CMAKE_ROOT_DIR} -DCMAKE_SKIP_RPATH=true -DPROXY_ONLY=true -DCPU_TYPE=${cpu_type} + fi + else + echo "Build global_cache_adaptor RELEASE." + if [ "${cpu_type}" = "aarch64" ] ; then + ${CMAKE} ${CMAKE_ROOT_DIR} -DCMAKE_BUILD_TYPE=RELEASE -DCMAKE_SKIP_RPATH=true + else + ${CMAKE} ${CMAKE_ROOT_DIR} -DCMAKE_BUILD_TYPE=RELEASE -DCMAKE_SKIP_RPATH=true -DPROXY_ONLY=true -DCPU_TYPE=${cpu_type} + fi + fi + make -j16 +} + +main $* +exit 0 + + + diff --git a/package/globalcache-adaptorlib_pack.sh b/package/globalcache-adaptorlib_pack.sh new file mode 100644 index 0000000..0c91dbb --- /dev/null +++ b/package/globalcache-adaptorlib_pack.sh @@ -0,0 +1,39 @@ +#!/bin/bash +set -e + +curd=$(pwd) +globalcache_adapter_dir=$curd/../ + +tar_mode=$1 + +cpu_type=$(uname -m) + +function initialize() +{ + if [[ -d "$curd/globalcache-adaptorlib" ]]; then + rm -rf $curd/globalcache-adaptorlib-${cpu_type} + fi + + mkdir -p $curd/globalcache-adaptorlib-${cpu_type} + + # lianjieku + cp -rf $globalcache_adapter_dir/build/lib/* $curd/globalcache-adaptorlib-${cpu_type} +} + +function pack_lib() +{ + if [[ $tar_mode == "DEBUG" ]] + then + tar -zcvf globalcache-adaptorlib-debug-oe1.${cpu_type}.tar.gz globalcache-adaptorlib-${cpu_type} + else + tar -zcvf globalcache-adaptorlib-release-oe1.${cpu_type}.tar.gz globalcache-adaptorlib-${cpu_type} + fi +} + +function main() +{ + initialize + pack_lib +} + +main $@ diff --git a/patch/README.md b/patch/README.md new file mode 100644 index 0000000..0827eb4 --- /dev/null +++ b/patch/README.md @@ -0,0 +1 @@ +# global_cache_adapter patch for ceph diff --git a/patch/ceph-global-cache-tls.patch b/patch/ceph-global-cache-tls.patch new file mode 100644 index 0000000..bc16652 --- /dev/null +++ b/patch/ceph-global-cache-tls.patch @@ -0,0 +1,724 @@ +diff --git a/CMakeLists.txt b/CMakeLists.txt +index f068082..567ea3f 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -3,6 +3,8 @@ cmake_minimum_required(VERSION 3.5.1) + project(ceph CXX C ASM) + set(VERSION 14.2.8) + ++link_directories(/opt/gcache_adaptor_compile/third_part/lib/) ++include_directories(/opt/gcache_adaptor_compile/third_part/inc/) + if(POLICY CMP0028) + cmake_policy(SET CMP0028 NEW) + endif() +diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt +index 28ec983..40090cb 100644 +--- a/src/CMakeLists.txt ++++ b/src/CMakeLists.txt +@@ -394,10 +394,10 @@ if(WITH_DPDK) + endif() + + add_library(common STATIC ${ceph_common_objs}) +-target_link_libraries(common ${ceph_common_deps}) ++target_link_libraries(common ${ceph_common_deps} -lssl -lcert -ldplog -ldposax -lconfparser -lmxml -lkmcext -lkmc -lsdp) + + add_library(ceph-common SHARED ${ceph_common_objs}) +-target_link_libraries(ceph-common ${ceph_common_deps}) ++target_link_libraries(ceph-common ${ceph_common_deps} -lssl -lcert -ldplog -ldposax -lconfparser -lmxml -lkmcext -lkmc -lsdp) + # appease dpkg-shlibdeps + set_target_properties(ceph-common PROPERTIES + SOVERSION 0 +diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc +index e9c8d40..527c635 100644 +--- a/src/msg/async/PosixStack.cc ++++ b/src/msg/async/PosixStack.cc +@@ -13,7 +13,6 @@ + * Foundation. See file COPYING. + * + */ +- + #include + #include + #include +@@ -33,10 +32,254 @@ + #include "include/compat.h" + #include "include/sock_compat.h" + ++ ++#define RETRY_TIME 10000 ++#define MAX_VERIFY_DEPTH 10 ++ + #define dout_subsys ceph_subsys_ms + #undef dout_prefix + #define dout_prefix *_dout << "PosixStack " + ++ ++class TlsPosixConnectedSocketImpl final : public ConnectedSocketImpl { ++ NetHandler &handler; ++ int _fd; ++ entity_addr_t sa; ++ bool connected; ++ SSL *ssl; ++ bool is_server; ++ CephContext *cct; ++ bool has_handShaked; ++ bool is_handShaked; ++ int retry_count; ++ public: ++ explicit TlsPosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected, SSL *ssl, bool is_server, CephContext *cct, bool is_handShaked) ++ : handler(h), _fd(f), sa(sa), connected(connected), ssl(ssl), is_server(is_server), cct (cct), is_handShaked(is_handShaked), has_handShaked(false), retry_count(0) ++ { ++ ldout(cct, 3) << "TLS Socket:" << _fd << dendl; ++ if (ssl == nullptr) { ++ lderr(cct) << __func__ << " ssl nullptr " << dendl; ++ } ++ } ++ ++ int is_connected() override { ++ if (connected) ++ return 1; ++ ++ int r = handler.reconnect(sa, _fd); ++ ++ if (r == 0) { ++ if (is_handShaked == false) { ++ int ret = SSL_connect(ssl); ++ if (ret > 0) { ++ is_handShaked = true; ++ ldout(cct, 20) << " TLS Socket " << _fd << " reconnect success" << dendl; ++ } else { ++ ldout(cct, 20) << " Socket " << _fd << " reconnect success, but TLS Socket reconnect failed" << dendl; ++ } ++ } ++ connected = true; ++ return 1; ++ } else if (r < 0) { ++ return r; ++ lderr(cct) << "TLS Socket " << _fd << "reconnect fail " << r << dendl; ++ } else { ++ return 0; ++ lderr(cct) << "TLS Socket " << _fd << "reconnect fail 0" << dendl; ++ } ++ } ++ ++ int try_handShaked() { ++ if (is_handShaked) { ++ return 1; ++ } ++ if (retry_count++ > RETRY_TIME) { ++ lderr(cct) << __func__ << " ssl accept failed error " << "retry time" << retry_count << dendl; ++ return -1; ++ } ++ ++ int ret = 0; ++ if (is_server) { ++ ret = SSL_accept(ssl); ++ if (ret <= 0) { ++ ldout(cct, 3) << "ssl accept failed fd: " << _fd << " retry time: " << retry_count << dendl; ++ return 0; ++ } ++ ldout(cct, 1) << "ssl accept success fd: " << _fd << " retry time: " << retry_count << dendl; ++ is_handShaked = true; ++ return 1; ++ } else { ++ ret = SSL_connect(ssl); ++ if (ret <= 0) { ++ ldout(cct, 3) << "ssl connect failed fd: " << _fd << " retry time: " << retry_count << dendl; ++ return 0; ++ } ++ ldout(cct, 1) << "ssl connect success fd: " << _fd << " retry time: " << retry_count << dendl; ++ is_handShaked = true; ++ return 1; ++ } ++ } ++ ++ ssize_t zero_copy_read(bufferptr&) override { ++ return -EOPNOTSUPP; ++ } ++ ++ ssize_t read(char *buf, size_t len) override { ++ int try_ret = try_handShaked(); ++ if (try_ret == -1) { ++ lderr(cct) << __func__ << " TLS handShaked failed" << " fd: " << _fd << dendl; ++ return -1; ++ } else if (try_ret == 0) { ++ return -11; ++ } ++ ERR_clear_error(); ++ ssize_t r = SSL_read(ssl, buf, len); ++ if (r <= 0) { ++ int read_errno = errno; ++ int ssl_errno = SSL_get_error(ssl, r); ++ long error = ERR_get_error(); ++ const char* error_string = ERR_error_string(error, NULL); ++ if (read_errno == 11) { ++ ldout(cct, 4) << "TLS read fail, r = " << r << " errno = " << read_errno << "len = " << len << "fd: " << _fd << "ssl_errno: "<< ssl_errno << " ssl_errno_str: " << error_string <(left_pbrs, IOV_MAX); ++ left_pbrs -= size; ++ // FIPS zeroization audit 20191115: this memset is not security related. ++ memset(&msg, 0, sizeof(msg)); ++ msg.msg_iovlen = size; ++ msg.msg_iov = msgvec; ++ unsigned msglen = 0; ++ for (auto iov = msgvec; iov != msgvec + size; iov++) { ++ iov->iov_base = (void*)(pb->c_str()); ++ iov->iov_len = pb->length(); ++ msglen += pb->length(); ++ ++pb; ++ } ++ ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more, ssl); ++ if (r < 0) { ++ lderr(cct) << __func__ << " TLS sent error " << SSL_get_error(ssl, r) << "fd: " << _fd << dendl; ++ return r; ++ } ++ ++ // "r" is the remaining length ++ sent_bytes += r; ++ if (static_cast(r) < msglen) ++ break; ++ // only "r" == 0 continue ++ } ++ ++ if (sent_bytes) { ++ bufferlist swapped; ++ if (sent_bytes < bl.length()) { ++ bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); ++ bl.swap(swapped); ++ } else { ++ bl.clear(); ++ } ++ } ++ ldout(cct, 10) << __func__ << " TLS sent " << sent_bytes << " bytes success" <(sent_bytes); ++ } ++ void shutdown() override { ++ ::shutdown(_fd, SHUT_RDWR); ++ } ++ void close() override { ++ ::close(_fd); ++ ldout(cct, 3) << "ssl close " << " fd: " << _fd << dendl; ++ SSL_free(ssl); ++ } ++ int fd() const override { ++ return _fd; ++ } ++ int socket_fd() const override { ++ return _fd; ++ } ++ friend class PosixServerSocketImpl; ++ friend class TlsPosixNetworkStack; ++}; ++ + class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + NetHandler &handler; + int _fd; +@@ -45,7 +288,8 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + + public: + explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected) +- : handler(h), _fd(f), sa(sa), connected(connected) {} ++ : handler(h), _fd(f), sa(sa), connected(connected) { ++ } + + int is_connected() override { + if (connected) +@@ -68,13 +312,12 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + + ssize_t read(char *buf, size_t len) override { + ssize_t r = ::read(_fd, buf, len); +- if (r < 0) ++ if (r < 0) { + r = -errno; ++ } + return r; + } + +- // return the sent length +- // < 0 means error occurred + static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) + { + size_t sent = 0; +@@ -82,6 +325,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + MSGR_SIGPIPE_STOPPER; + ssize_t r; + r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); ++ + if (r < 0) { + if (errno == EINTR) { + continue; +@@ -169,16 +413,22 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + friend class PosixNetworkStack; + }; + ++void PosixWorker::initialize() ++{ ++} ++ + class PosixServerSocketImpl : public ServerSocketImpl { + NetHandler &handler; + int _fd; ++ SSL_CTX *ctx; ++ CephContext *cct; + + public: + explicit PosixServerSocketImpl(NetHandler &h, int f, +- const entity_addr_t& listen_addr, unsigned slot) ++ const entity_addr_t& listen_addr, unsigned slot, SSL_CTX *ctx, CephContext *cct) + : ServerSocketImpl(listen_addr.get_type(), slot), +- handler(h), _fd(f) {} +- int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; ++ handler(h), _fd(f), ctx(ctx), cct(cct) {} ++ int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w); + void abort_accept() override { + ::close(_fd); + } +@@ -188,23 +438,28 @@ class PosixServerSocketImpl : public ServerSocketImpl { + }; + + int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { ++ certParam_t certParam; ++ + ceph_assert(sock); + sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen); + if (sd < 0) { ++ ldout(cct, 3) << " accept_cloexec failed " << " errno: "<< errno << dendl; + return -errno; + } + + int r = handler.set_nonblock(sd); + if (r < 0) { + ::close(sd); ++ ldout(cct, 3) << " set_nonblock failed " <set_type(addr_type); + out->set_sockaddr((sockaddr*)&ss); ++ entity_addr_t local_addr; ++ sockaddr_storage localS; ++ socklen_t llen = sizeof(localS); ++ int rc = getsockname(_fd, (sockaddr*)&localS, &llen); ++ if (rc < 0) { ++ lderr(cct) << __func__ << " get port failed " << dendl; ++ } ++ local_addr.set_sockaddr((sockaddr*)&localS); + handler.set_priority(sd, opt.priority, out->get_family()); + +- std::unique_ptr csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); +- *sock = ConnectedSocket(std::move(csi)); +- return 0; +-} ++ if (certParamInit(&certParam) != 0) { ++ lderr(cct) << " cert param init failed" << " abort" << dendl; ++ ceph_abort(); ++ } + +-void PosixWorker::initialize() +-{ ++ if ((certParam.tlsStatus == 1) && (local_addr.get_port() >= 7880 && local_addr.get_port() <= 7889)) { ++ SSL *sslNew = SSL_new(ctx); ++ if (sslNew == nullptr) { ++ ::close(sd); ++ lderr(cct) << __func__ << " ssl nullptr " << dendl; ++ return -errno; ++ } ++ ldout(cct, 3) << __func__ << " TLS accept fd: " << sd << " port: " << local_addr.get_port() << " SSL Version: " << SSL_get_version(sslNew) < 0) { ++ is_handShaked = true; ++ ldout(cct, 3) << "ssl accept success fd: " << sd << " connect port: " << local_addr.get_port() << dendl; ++ } ++ std::unique_ptr csi(new TlsPosixConnectedSocketImpl(handler, *out, sd, true, sslNew, true, cct, is_handShaked)); ++ *sock = ConnectedSocket(std::move(csi)); ++ } else { ++ ldout(cct, 3) << " TCP accept success "<< "fd: " << sd < csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); ++ *sock = ConnectedSocket(std::move(csi)); ++ } ++ return 0; + } + + int PosixWorker::listen(entity_addr_t &sa, +@@ -232,7 +521,6 @@ int PosixWorker::listen(entity_addr_t &sa, + if (listen_sd < 0) { + return -errno; + } +- + int r = net.set_nonblock(listen_sd); + if (r < 0) { + ::close(listen_sd); +@@ -248,7 +536,7 @@ int PosixWorker::listen(entity_addr_t &sa, + r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); + if (r < 0) { + r = -errno; +- ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() ++ ldout(cct, 3) << __func__ << " unable to bind to " << sa.get_sockaddr() + << ": " << cpp_strerror(r) << dendl; + ::close(listen_sd); + return r; +@@ -264,13 +552,12 @@ int PosixWorker::listen(entity_addr_t &sa, + + *sock = ServerSocket( + std::unique_ptr( +- new PosixServerSocketImpl(net, listen_sd, sa, addr_slot))); ++ new PosixServerSocketImpl(net, listen_sd, sa, addr_slot, ctx, cct))); + return 0; + } + + int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { + int sd; +- + if (opts.nonblock) { + sd = net.nonblock_connect(addr, opts.connect_bind_addr); + } else { +@@ -282,8 +569,34 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C + } + + net.set_priority(sd, opts.priority, addr.get_family()); +- *socket = ConnectedSocket( ++ ++ ldout(cct, 3) << "fd:" << sd << " connect port: " << addr.get_port() << " tlsStatus " << tlsParam.tlsStatus << dendl; ++ if ((tlsParam.tlsStatus == 1) && (addr.get_port() >= tlsParam.portIdStart && addr.get_port() <= tlsParam.portIdEnd)) { ++ SSL *sslNew = SSL_new(ctx); ++ if (sslNew == nullptr) { ++ lderr(cct) << __func__ << " ssl new failed " << dendl; ++ return -errno; ++ } ++ ldout(cct, 3) << "SSL Version: " << SSL_get_version(sslNew) << dendl; ++ if (SSL_set_fd(sslNew, sd) <= 0) { ++ lderr(cct) << __func__ << " ctx set fd failed " << dendl; ++ return -errno; ++ } ++ int ret = SSL_connect(sslNew); ++ bool is_handShaked = false; ++ if (ret > 0) { ++ is_handShaked = true; ++ ldout(cct, 3) << "ssl connect success fd: " << sd << "connect port: " << addr.get_port() << dendl; ++ } else { ++ ldout(cct, 3) << "ssl connect skip fd=" << sd << " headshake=" << is_handShaked << dendl; ++ } ++ ++ *socket = ConnectedSocket( ++ std::unique_ptr(new TlsPosixConnectedSocketImpl(net, addr, sd, !opts.nonblock, sslNew, false, cct, is_handShaked))); ++ } else { ++ *socket = ConnectedSocket( + std::unique_ptr(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); ++ } + return 0; + } + +diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h +index f1aaccd..0a240b2 100644 +--- a/src/msg/async/PosixStack.h ++++ b/src/msg/async/PosixStack.h +@@ -18,18 +18,31 @@ + #define CEPH_MSG_ASYNC_POSIXSTACK_H + + #include ++#include ++#include ++#include ++#include ++#include ++#include ++#include ++#include + + #include "msg/msg_types.h" + #include "msg/async/net_handler.h" + + #include "Stack.h" ++#include "cert_ceph.h" ++ + + class PosixWorker : public Worker { + NetHandler net; + void initialize() override; ++ SSL_CTX *ctx; ++ certParam_t tlsParam; + public: +- PosixWorker(CephContext *c, unsigned i) +- : Worker(c, i), net(c) {} ++ PosixWorker(CephContext *c, unsigned i, SSL_CTX *ctx, certParam_t tlsParam) ++ : Worker(c, i), net(c) , ctx(ctx), tlsParam(tlsParam){ ++ } + int listen(entity_addr_t &sa, + unsigned addr_slot, + const SocketOptions &opt, +diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc +index 8976c3c..7f7185d 100644 +--- a/src/msg/async/Stack.cc ++++ b/src/msg/async/Stack.cc +@@ -29,6 +29,7 @@ + + #include "common/dout.h" + #include "include/ceph_assert.h" ++#include "cert_ceph.h" + + #define dout_subsys ceph_subsys_ms + #undef dout_prefix +@@ -82,10 +83,10 @@ std::shared_ptr NetworkStack::create(CephContext *c, const string + return nullptr; + } + +-Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i) ++Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i, SSL_CTX *ctx, certParam_t tlsParam) + { + if (type == "posix") +- return new PosixWorker(c, i); ++ return new PosixWorker(c, i, ctx, tlsParam); + #ifdef HAVE_RDMA + else if (type == "rdma") + return new RDMAWorker(c, i); +@@ -105,6 +106,31 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa + { + ceph_assert(cct->_conf->ms_async_op_threads > 0); + ++ tlsParam = {0}; ++ if (certParamInit(&tlsParam) != 0) { ++ lderr(cct) << __func__ << " tlsParam init failed " << dendl; ++ } ++ ldout(cct, 3) << __func__ << " tlsParam init success " << dendl; ++ if (tlsParam.tlsStatus == 1) { ++ if (checkCertTask(&tlsParam) != 0) { ++ printf("check cert failed, exit\n"); ++ lderr(cct) << __func__ << " check cert failed " << dendl; ++ exit(1); ++ } ++ if (sslctxInit(&tlsParam, &ctx) != 0) { ++ printf("ctx init failed, exit\n"); ++ lderr(cct) << __func__ << " ctx init failed " << dendl; ++ exit(1); ++ } ++ if (backgroundCertMonitorInit(&tlsParam) != 0) { ++ printf("init cert monitor thread failed, exit\n"); ++ lderr(cct) << __func__ << " init cert monitor thread failed " << dendl; ++ } ++ SSL_CTX_set_mode(ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); ++ SSL_CTX_set_mode(ctx, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); ++ ldout(cct, 3) << __func__ << " ctx init success " << dendl; ++ } ++ + const int InitEventNumber = 5000; + num_workers = cct->_conf->ms_async_op_threads; + if (num_workers >= EventCenter::MAX_EVENTCENTER) { +@@ -116,7 +142,7 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa + } + + for (unsigned i = 0; i < num_workers; ++i) { +- Worker *w = create_worker(cct, type, i); ++ Worker *w = create_worker(cct, type, i, ctx, tlsParam); + w->center.init(InitEventNumber, i, type); + workers.push_back(w); + } +diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h +index a093dad..45415b1 100644 +--- a/src/msg/async/Stack.h ++++ b/src/msg/async/Stack.h +@@ -21,6 +21,7 @@ + #include "common/perf_counters.h" + #include "msg/msg_types.h" + #include "msg/async/Event.h" ++#include "cert_ceph.h" + + class Worker; + class ConnectedSocketImpl { +@@ -308,21 +309,27 @@ class NetworkStack { + protected: + CephContext *cct; + vector workers; ++ SSL_CTX *ctx; ++ certParam_t tlsParam; + + explicit NetworkStack(CephContext *c, const string &t); + public: + NetworkStack(const NetworkStack &) = delete; + NetworkStack& operator=(const NetworkStack &) = delete; + virtual ~NetworkStack() { +- for (auto &&w : workers) ++ for (auto &&w : workers) { + delete w; ++ } ++ if ((ctx != nullptr) && (tlsParam.tlsStatus == 1)) { ++ SSL_CTX_free(ctx); ++ } + } + + static std::shared_ptr create( + CephContext *c, const string &type); + + static Worker* create_worker( +- CephContext *c, const string &t, unsigned i); ++ CephContext *c, const string &t, unsigned i, SSL_CTX *ctx, certParam_t tlsParam); + // backend need to override this method if supports zero copy read + virtual bool support_zero_copy_read() const { return false; } + // backend need to override this method if backend doesn't support shared +diff --git a/src/msg/async/cert_ceph.h b/src/msg/async/cert_ceph.h +new file mode 100644 +index 0000000..a60ea0a +--- /dev/null ++++ b/src/msg/async/cert_ceph.h +@@ -0,0 +1,62 @@ ++/* ++Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++ ++Licensed under the Apache License, Version 2.0 (the "License"); ++you may not use this file except in compliance with the License. ++You may obtain a copy of the License at ++ ++ http://www.apache.org/licenses/LICENSE-2.0 ++ ++Unless required by applicable law or agreed to in writing, software ++distributed under the License is distributed on an "AS IS" BASIS, ++WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++See the License for the specific language governing permissions and ++limitations under the License. ++*/ ++ ++#ifndef _CERT_CEPH_H_ ++#define _CERT_CEPH_H_ ++ ++#include ++#include ++#include ++#include ++#include ++ ++#ifdef __cplusplus ++extern "C" { ++#endif ++ ++#define MAX_CIPHER_NAME 50 ++ ++typedef struct certParam { ++ int8_t tlsStatus; ++ int32_t tlsVersion; ++ char tlsCipherList[MAX_CIPHER_NAME]; ++ char caFilePath[PATH_MAX]; ++ char keypassFilePath[PATH_MAX]; ++ char agentCertFilePath[PATH_MAX]; ++ char publicKeyFilePath[PATH_MAX]; ++ char privateKeyFilePath[PATH_MAX]; ++ char revokeCrlFilePath[PATH_MAX]; ++ char kmcPrimaryKsfPath[PATH_MAX]; ++ char kmcStandbyKsfPath[PATH_MAX]; ++ int32_t maxConnect; ++ int32_t portIdStart; ++ int32_t portIdEnd; ++ int32_t certCheckPeriodDays; ++ int32_t certCheckWarnningDays; ++} certParam_t; ++ ++int32_t certParamInit(certParam_t *param); ++int32_t checkCertTask(certParam_t *param); ++int32_t sslctxInit(certParam_t *param, SSL_CTX **ctx); ++int32_t backgroundCertMonitorInit(certParam_t *param); ++ ++#define MAX_CONNECT_COUNT 40960 ++ ++#ifdef __cplusplus ++} ++#endif ++ ++#endif diff --git a/patch/ceph-global-cache.patch b/patch/ceph-global-cache.patch new file mode 100644 index 0000000..0c82e1a --- /dev/null +++ b/patch/ceph-global-cache.patch @@ -0,0 +1,5839 @@ +diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt +index 28ec9835..bf4aa6e8 100644 +--- a/src/CMakeLists.txt ++++ b/src/CMakeLists.txt +@@ -103,6 +103,11 @@ if(HAVE_INTEL) + HAVE_BETTER_YASM_ELF64) + endif() + ++if(WITH_GLOBAL_CACHE) ++set(third_part_dir /opt/gcache_adaptor_compile/third_part) ++message(STATUS "third part directory --> " ${third_part_dir}) ++link_directories(${third_part_dir}/lib) ++endif() + + # require c++17 + if(CMAKE_VERSION VERSION_LESS "3.8") +@@ -393,6 +398,10 @@ if(WITH_DPDK) + list(APPEND ceph_common_deps common_async_dpdk) + endif() + ++if(WITH_GLOBAL_CACHE) ++ list(APPEND ceph_common_deps ceph_client_adaptor_plugin) ++endif() ++ + add_library(common STATIC ${ceph_common_objs}) + target_link_libraries(common ${ceph_common_deps}) + +@@ -560,6 +569,12 @@ add_subdirectory(dmclock) + + add_subdirectory(compressor) + ++# Client adaptor ++if(WITH_GLOBAL_CACHE) ++message(STATUS "Client adaptor cmake executing...") ++add_subdirectory(client_adaptor) ++endif() ++ + add_subdirectory(tools) + + if(WITH_TESTS) +diff --git a/src/client_adaptor/CMakeLists.txt b/src/client_adaptor/CMakeLists.txt +new file mode 100644 +index 00000000..aba70f7e +--- /dev/null ++++ b/src/client_adaptor/CMakeLists.txt +@@ -0,0 +1,17 @@ ++set(client_adaptor_srcs ++ ClientAdaptorMsg.cc ++ ClientAdaptorMgr.cc ++ ClientAdaptorPerf.cc ++ ClientAdaptorPlugin.cc ++) ++ ++add_library(ceph_client_adaptor_plugin SHARED ${client_adaptor_srcs}) ++message(STATUS "In cliend adaptor third part directory --> " ${third_part_dir}) ++target_link_libraries(ceph_client_adaptor_plugin osdc agent_client_lib das) ++ ++ ++set(client_adaptor_dir ${CEPH_INSTALL_PKGLIBDIR}) ++install(TARGETS ceph_client_adaptor_plugin DESTINATION ${client_adaptor_dir}) ++ ++message(STATUS "Global Cache client-adaptor cmake executing...") ++ +diff --git a/src/client_adaptor/ClientAdaptorMgr.cc b/src/client_adaptor/ClientAdaptorMgr.cc +new file mode 100644 +index 00000000..f9ca79b1 +--- /dev/null ++++ b/src/client_adaptor/ClientAdaptorMgr.cc +@@ -0,0 +1,256 @@ ++/* License:LGPL-2.1 ++* ++* Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++* ++*/ ++ ++#include ++#include "ClientAdaptorMgr.h" ++ ++void ClientAdaptorMgr::set_init_flag(bool flag){ ++ init_flag = flag; ++ return; ++} ++ ++const bool ClientAdaptorMgr::is_init_succeed(){ ++ if(init_flag){ ++ return true; ++ } else { ++ return false; ++ } ++} ++ ++int32_t CcmPtChangeNotify(PTViewPtEntry *entry, uint32_t entryNum, void *ctx) ++{ ++ if (entryNum == 0) { ++ return RET_OK; ++ } ++ std::vector normal_pt; ++ for (uint32_t i = 0; i < entryNum; i++) { ++ if (entry[i].state == CCM_PT_STATE_OK) { ++ normal_pt.push_back(entry[i].ptId); ++ } ++ } ++ if (normal_pt.size() == 0) { ++ return RET_OK; ++ } ++ Objecter *obj = static_cast(ctx); ++ obj->retry_op_submit(normal_pt); ++ return RET_OK; ++} ++ ++int32_t CcmNodeChangeNotify(NodeInfo *nodeList, uint32_t nodeNum, void *ctx) { ++ if (nodeNum == 0) { ++ return RET_OK; ++ } ++ std::set available_nodes; ++ for (uint32_t i = 0; i < nodeNum; i++) { ++ if (nodeList[i].state == NODE_STATE_UP) { ++ available_nodes.insert(nodeList[i].nodeId); ++ } ++ } ++ if (nodeNum - available_nodes.size() == 0) { ++ return RET_OK; ++ } ++ Objecter *obj = static_cast(ctx); ++ obj->nodeview_change_retry_op_submit(available_nodes); ++ return RET_OK; ++} ++ ++void ClientAdaptorCcm::ccm_deregister(Objecter *obj) ++{ ++ if (register_objs.count(obj) > 0) { ++ OpenDeregisterViewChangeNotifyChain(register_objs[obj]); ++ OpenDeregisterNodeViewChangeNotifyChain(register_node_change_objs[obj]); ++ delete register_objs[obj]; ++ delete register_node_change_objs[obj]; ++ register_objs.erase(obj); ++ register_node_change_objs.erase(obj); ++ } ++ if (register_objs.empty() && is_init_succeed()) { ++ set_init_flag(false); ++ } ++} ++ ++bool ClientAdaptorCcm::ccm_callback_register(Objecter *obj) ++{ ++ PTViewChangeOpHandle *ccmCallback = new PTViewChangeOpHandle(); ++ NodeViewChangeOpHandle *ccmNodeChangeCallback = new NodeViewChangeOpHandle(); ++ register_objs[obj] = ccmCallback; ++ register_node_change_objs[obj] = ccmNodeChangeCallback; ++ ccmCallback->notifyPtChange = CcmPtChangeNotify; ++ ccmNodeChangeCallback->notifyNodeChange = CcmNodeChangeNotify; ++ ccmCallback->ctx = (void *)obj; ++ ccmNodeChangeCallback->ctx = (void *)obj; ++ if (OpenRegisterViewChangeNotifyChain(ccmCallback) || OpenRegisterNodeViewChangeNotifyChain(ccmNodeChangeCallback)) { ++ std::cout << __func__ << " Client Adaptor: CCM agent register failed" << std::endl; ++ return false; ++ } ++ return true; ++} ++ ++/* ++ * Init manager , failed to return !0 ++*/ ++int32_t ClientAdaptorCcm::init_mgr(Objecter *obj){ ++ int32_t ret = 0; ++ //Already init ++ if (is_init_succeed()){ ++ if (!ccm_callback_register(obj)) { ++ return RET_CCM_REGISTER_ERROR; ++ } ++ return RET_OK; ++ } ++ ++ ret = OpenBcmInit(); ++ if (ret != 0) { ++ std::cout << __func__ << " ccm agint init failed, ret=" << ret << std::endl; ++ return RET_CCM_AGENT_INIT_ERROR; ++ } ++ ++ if (!ccm_callback_register(obj)){ ++ return RET_CCM_REGISTER_ERROR; ++ } ++ return RET_OK; ++} ++ ++int32_t ClientAdaptorCcm::get_pt_num(int32_t clusterId, uint32_t& num){ ++ num = OpenGetTotalPtNum(clusterId); ++ return RET_OK; ++} ++ ++int32_t ClientAdaptorCcm::get_pt_entry(int32_t clusterId, uint32_t pt_index, PTViewPtEntry* entry){ ++ if (OpenGetPtEntry(clusterId, pt_index, entry)){ ++ std::cout << __func__ << " Client Adaptor: Get PT entry failed" << std::endl; ++ return RET_CCM_PT_ENTRY_ERROR; ++ } ++ return RET_OK; ++} ++ ++int32_t ClientAdaptorCcm::get_node_info(int32_t clusterId, uint32_t node_id, NodeInfo* node_info){ ++ using namespace std::chrono; ++ int32_t ret = 0; ++ seconds timeout {50 * 60}; ++ ++ steady_clock::time_point begin = steady_clock::now(); ++ while (true) { ++ ret = OpenAgentGetNodeInfo(clusterId, node_id, node_info); ++ if (ret == 0) { ++ break; ++ } ++ ++ steady_clock::time_point end = steady_clock::now(); ++ seconds time_span = duration_cast(end - begin); ++ ++ if (time_span > timeout) { ++ std::cout << __func__ << " Client Adaptor: get node info failed because of timeout" ++ << ", timeout=" << timeout << std::endl; ++ break; ++ } ++ sleep(1); ++ } ++ if (ret){ ++ std::cout << __func__ << " Client Adaptor: Get node infomation failed" << std::endl; ++ return RET_CCM_NODE_INFO_ERROR; ++ } ++ ++ return RET_OK; ++} ++ ++bool ClientAdaptorCcm::get_pt_status(int32_t clusterId, uint32_t pt_id) { ++ bool ret = true; ++ PTViewPtEntry pt_entry = { 0 }; ++ if (get_pt_entry(clusterId, pt_id, &pt_entry)) { ++ std::cout << __func__ << " Client Adaptor: Get PT entry failed" << std::endl; ++ return false; ++ } ++ ++ if (pt_entry.state != CCM_PT_STATE_OK) { ++ return false; ++ } ++ return ret; ++} ++ ++int32_t ClientAdaptorCcm::add_snap_to_gc(int64_t md_pool_id, ++ int64_t data_pool_id, ++ const std::string &image_id, ++ uint64_t snap_id) { ++ int32_t ret = OpenCreateSnapshot(md_pool_id, data_pool_id, image_id.c_str(), snap_id); ++ if (ret < 0) { ++ std::cout << __func__ << " Client Adaptor: Add to gc failed, snap_id=" << snap_id ++ << " ret="<< ret << std::endl; ++ return ret; ++ } ++ return 0; ++} ++ ++int32_t ClientAdaptorCcm::remove_snap_from_gc(int64_t data_pool_id, ++ const std::string &name_space, ++ const std::string &image_id, ++ uint64_t snap_id) { ++ int32_t ret = OpenDeleteSnapshot(data_pool_id, name_space.c_str(), image_id.c_str(), snap_id); ++ if (ret < 0) { ++ std::cout << __func__ << " Client Adaptor: Remove from gc failed, snap_name=" << data_pool_id << "/" << image_id ++ << "@" << snap_id << " ret="<< ret << std::endl; ++ return ret; ++ } ++ return 0; ++} ++ ++int32_t ClientAdaptorCcm::remove_gc_image_resource(int64_t data_pool_id, const std::string image_id) { ++ int32_t ret = OpenReleaseImageResource(data_pool_id, image_id.c_str()); ++ if (ret < 0) { ++ std::cout << __func__ << " Client Adaptor: Remove image resource failed, image_id=" << image_id ++ << " ret="<< ret << std::endl; ++ return ret; ++ } ++ return 0; ++} ++ ++int32_t ClientAdaptorCcm::get_node_from_ma(const int64_t pool_id, int32_t *nodeId) { ++ return OpenAgentInit(pool_id, nodeId); ++} ++ ++int32_t ClientAdaptorCcm::rollback_gc_snap(int64_t md_pool_id, int64_t data_pool_id, ++ const std::string image_id, uint64_t num_objs, ++ uint64_t snap_seq, uint64_t rb_snap_id, ++ uint64_t tp_snap_id1, uint64_t tp_snap_id2) { ++ RollbackInfo info; ++ info.mdPoolId = md_pool_id; ++ info.dataPoolId = data_pool_id; ++ info.imageId = image_id.c_str(); ++ info.numObjs = num_objs; ++ info.snapId = rb_snap_id; ++ info.oldHeadSnapId = tp_snap_id1; ++ info.rollbackSnapId = tp_snap_id2; ++ info.snapSeq = snap_seq; ++ int ret = OpenRollbackSnapshot(&info); ++ if (ret < 0) { ++ std::cout << __func__ << "Client Adaptor: register rollback list failed, snap=" ++ << md_pool_id << "-" << data_pool_id << "/" << image_id ++ << "@" << rb_snap_id << " ret="<< ret << std::endl; ++ return ret; ++ } ++ return 0; ++} ++ ++int32_t ClientAdaptorCcm::gc_is_rollbacking(int64_t md_pool_id, int64_t data_pool_id, const std::string image_id) ++{ ++ int ret = OpenImageBusy(data_pool_id, image_id.c_str()); ++ if (ret < 0) { ++ std::cout << __func__ << "Client Adaptor: query gc rollbacking failed, image=" << md_pool_id ++ << "(" << data_pool_id << ")/" < register_objs; ++ std::map register_node_change_objs; ++ bool ccm_callback_register(Objecter *obj); ++}; ++ ++class ClientAdaptorLocal : public ClientAdaptorMgr { ++public: ++ ClientAdaptorLocal(){} ++ ~ClientAdaptorLocal() override {} ++ ++ int32_t init_mgr(Objecter *obj){ ++ return 0; ++ } ++ ++ int32_t get_pt_num(int32_t clusterId, uint32_t& num){ ++ num = 10; ++ return 0; ++ } ++ ++ int32_t get_pt_entry(int32_t clusterId, uint32_t pt_index, PTViewPtEntry* entry){ ++ entry->curNodeInfo.nodeId = pt_index % 3; ++ return 0; ++ } ++ ++ int32_t get_node_info(int32_t clusterId, uint32_t node_id, NodeInfo* node_info){ ++ strcpy(node_info->publicAddrStr, "localhost"); ++ node_info->ports[0] = 1234; ++ node_info->portNum = 1; ++ return 0; ++ } ++ ++ int32_t add_snap_to_gc(int64_t md_pool_id, int64_t data_pool_id, const std::string &image_id, uint64_t snap_id) { ++ return 0; ++ } ++ ++ int32_t remove_snap_from_gc(int64_t data_pool_id, const std::string &name_space, ++ const std::string &image_id, uint64_t snap_id) { ++ return 0; ++ } ++ ++ int32_t remove_gc_image_resource(const int64_t pool_id, const std::string image_id) { ++ return 0; ++ } ++ int32_t get_node_from_ma(const int64_t pool_id, int32_t *nodeId) { ++ return 0; ++ } ++ int32_t rollback_gc_snap(int64_t pool_id, int64_t data_pool_id, ++ const std::string image_id, uint64_t num_objs, uint64_t snap_seq, ++ uint64_t rb_snap_id, uint64_t tp_snap_id1, uint64_t tp_snap_id2) { ++ return 0; ++ } ++ ++ int32_t gc_is_rollbacking(int64_t md_pool_id, int64_t data_pool_id, const std::string image_id) { ++ return 0; ++ } ++ ++ int32_t gc_snap_is_rollbacking(int64_t data_pool_id, const std::string image_id, int64_t snap_id) { ++ return 0; ++ } ++ ++ const std::string name() override { ++ return "ClientAdaptorLocal"; ++ } ++ bool get_pt_status(int32_t clusterId, uint32_t pt_id) { ++ return true; ++ } ++ void ccm_deregister(Objecter *obj) {} ++ ++private: ++}; ++ ++ ++#endif +diff --git a/src/client_adaptor/ClientAdaptorMsg.cc b/src/client_adaptor/ClientAdaptorMsg.cc +new file mode 100644 +index 00000000..ee19bbd5 +--- /dev/null ++++ b/src/client_adaptor/ClientAdaptorMsg.cc +@@ -0,0 +1,348 @@ ++/* License:LGPL-2.1 ++* ++* Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++* ++*/ ++ ++#include ++#include ++#include ++#include "ClientAdaptorMsg.h" ++#include "ClientAdaptorMgr.h" ++ ++namespace { ++const int RBD_DATA_OBJECT_NAME_FILTER_LEN = 8; // rbd_data(8 bits) ++const int RBD_DATA_OBJECT_NAME_LEN = 27; // rbd_data(8 bits).image_id(n bits).object_index(16 bits) ++const string RBD_DATA_OBJECT_NAME = "rbd_data"; ++const int RGW_BUCKET_ID_LEN = 36; // bucket_id(36 bits) ++const int RGW_OBJECT_NAME_LEN = 38; // bucket_id(36 bits)_Object_Name(n bits) ++const uint64_t SEGMENT_SIZE = 4194304; ++const uint64_t SEGMENT_MASK = 0x3FFFFF; ++const int OBJECT_ID_LEN = 16; ++const int GC_PORT_MIN = 7880; ++const int GC_PORT_MAX = 7889; ++const string GC_SNAP_PREFIX = "gc_"; ++} ++ ++ClientAdaptorMsg::ClientAdaptorMsg(ClientAdaptorMgr* mgr) : mgr_ref(mgr){ ++} ++ ++void ClientAdaptorMsg::push_strategy(Objecter *objecter, uint64_t pool_id, int32_t node_id, std::string oid_name, bufferlist &indata) ++{ ++ if (objecter == NULL) { ++ std::cout << __func__ << " Objecter null " << std::endl; ++ return; ++ } ++ if (oid_name.size() == 0) { ++ std::cout << __func__ << " oid_name size 0 " << std::endl; ++ return; ++ } ++ if (node_id < 0) { ++ std::cout << __func__ << " node id < 0 " << std::endl; ++ return; ++ } ++ const char *cls = "rpc"; ++ const char *method = "das_prefetch"; ++ vector nops(1); ++ OSDOp &op = nops[0]; ++ op.op.op = CEPH_OSD_OP_CALL; ++ op.op.cls.class_len = strlen(cls); ++ op.op.cls.method_len = strlen(method); ++ op.op.cls.indata_len = indata.length(); ++ op.indata.append(cls, op.op.cls.class_len); ++ op.indata.append(method, op.op.cls.method_len); ++ op.indata.append(indata); ++ Objecter::Op *objecter_op = ++ new Objecter::Op(object_t(oid_name), object_locator_t(), nops, CEPH_OSD_FLAG_EXEC, NULL, NULL, NULL, nullptr); ++ objecter_op->target.osd = node_id; ++ objecter_op->target.base_oloc.pool = pool_id; ++ objecter_op->target.base_oid.name = oid_name; ++ objecter_op->target.flags = CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE; ++ ++ objecter->op_submit(objecter_op); ++} ++ ++/** ++ * Maximum string length of the RBD block object name prefix (not including ++ * null termination). ++ */ ++bool ClientAdaptorMsg::filter_msg(Objecter::op_target_t *t){ ++ string obj_name = t->base_oid.name; ++ if (obj_name.size() < RBD_DATA_OBJECT_NAME_LEN) { ++ return false; ++ } ++ ++ if (obj_name.compare(0, RBD_DATA_OBJECT_NAME_FILTER_LEN, RBD_DATA_OBJECT_NAME) == 0){ ++ return true; ++ } ++ ++ return false; ++} ++ ++bool ClientAdaptorMsg::filter_msg_by_op(Objecter::Op *op){ ++ return filter_msg(&op->target); ++} ++ ++ ++bool ClientAdaptorMsg::is_node(uint32_t index){ ++ if (index >> FLAG_OFFSET_BIT){ ++ return true; ++ } ++ return false; ++} ++ ++int32_t ClientAdaptorMsg::get_node_id(int32_t clusterId, string obj_name, int64_t pool_id, uint32_t& pt_id){ ++ if (obj_name.length() == 0){ ++ std::cout << __func__ << " Client Adaptor: input parameter invalid!" << std::endl; ++ return -RET_CCM_PARAM_ERROR; ++ } ++ ++ string obj_id = to_string(pool_id); ++ obj_id += '_'; ++ obj_id += obj_name; ++ hash hash_str; ++ uint32_t obj_hashed_id = hash_str(obj_id); ++ ++ uint32_t pt_num = 0; ++ NodeInfo info = {0}; ++ ++ if (mgr_ref->get_pt_num(clusterId, pt_num)){ ++ std::cout << __func__ << " Client Adaptor: Get PT number failed" << std::endl; ++ return -RET_CCM_PT_NUM_ERROR; ++ } ++ ++ if (pt_num == 0) { ++ std::cout << __func__ << " Client Adaptor: Get PT number zero" << std::endl; ++ return -RET_CCM_PT_NUM_ERROR; ++ } ++ ++ pt_id = obj_hashed_id % pt_num; ++ ++ PTViewPtEntry pt_entry = {0}; ++ if (mgr_ref->get_pt_entry(clusterId, pt_id, &pt_entry)){ ++ std::cout << __func__ << " Client Adaptor: Get PT entry failed" << std::endl; ++ return -RET_CCM_PT_ENTRY_ERROR; ++ } ++ uint32_t node_id = pt_entry.curNodeInfo.nodeId << NODE_ID_OFFSET_BIT; ++ node_id += 0x1 << FLAG_OFFSET_BIT; ++ node_id += clusterId << CLUSTER_ID_OFFSET; ++ ++ if (mgr_ref->get_node_info(clusterId, pt_entry.curNodeInfo.nodeId, &info)){ ++ std::cout << __func__ << " Client Adaptor: Get node info failed. node id " << pt_entry.curNodeInfo.nodeId << std::endl; ++ return -RET_CCM_NODE_INFO_ERROR; ++ } ++ if (info.portNum > PORT_SUPPORT_MAX || info.portNum == 0) { ++ std::cout << __func__ << " Client Adaptor: Port number invalid. Port Number: " << info.portNum << std::endl; ++ return -RET_CCM_PORT_NUM_ERROR; ++ } ++ uint32_t pt_index = pt_entry.indexInNode; ++ ++ node_id += pt_index % info.portNum; ++ ++ return node_id; ++} ++ ++bool ClientAdaptorMsg::valid_ip(string ip_addr) ++{ ++ string regStr = "^((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]\\d|[1-9])"\ ++ "(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]\\d|\\d)){3})|(0.0.0.0)$"; ++ regex regIp(regStr); ++ bool matchValue = regex_match(ip_addr, regIp); ++ return matchValue; ++} ++ ++string ClientAdaptorMsg::gc_snap_prefix() ++{ ++ return GC_SNAP_PREFIX; ++} ++ ++bool ClientAdaptorMsg::is_gc_snap(string snap_name) ++{ ++ int n = snap_name.find(GC_SNAP_PREFIX); ++ if (n == 0) { ++ return true; ++ } ++ return false; ++} ++ ++void ClientAdaptorMsg::gen_random_gc_snap(uint64_t snap_id, int num, string &rd_snap_name) ++{ ++ std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); ++ std::chrono::microseconds ms = std::chrono::duration_cast(now.time_since_epoch()); ++ long timepoint = ms.count(); ++ std::mt19937 generator{std::random_device{}()}; ++ std::uniform_int_distribution distribution{0, 0xFFFF}; ++ uint32_t extra = distribution(generator); ++ rd_snap_name.clear(); ++ rd_snap_name.append(GC_SNAP_PREFIX); ++ rd_snap_name.append(to_string(extra)); ++ rd_snap_name.append("_"); ++ rd_snap_name.append(to_string(snap_id)); ++ rd_snap_name.append("_"); ++ rd_snap_name.append(to_string(num)); ++ rd_snap_name.append("_"); ++ rd_snap_name.append(to_string(timepoint)); ++} ++ ++int32_t ClientAdaptorMsg::get_node_ip(int32_t clusterId, uint32_t node_index, string& node_ip){ ++ uint32_t node_id = (node_index & NODE_ID_MASK) >> NODE_ID_OFFSET_BIT; ++ uint32_t port_index = node_index & PORT_INDEX_MASK; ++ NodeInfo info = {0}; ++ if (mgr_ref->get_node_info(clusterId, node_id, &info)){ ++ std::cout << __func__ << " Client Adaptor: Get node info failed." << std::endl; ++ return RET_CCM_NODE_INFO_ERROR; ++ } ++ uint32_t port = info.ports[port_index]; ++ if (port < GC_PORT_MIN || port > GC_PORT_MAX) { ++ return RET_CCM_PORT_NUM_ERROR; ++ } ++ ++ string server_ip = info.publicAddrStr; ++ ++ if (!valid_ip(server_ip)) { ++ return RET_CCM_IP_ERROR; ++ } ++ ++ string addr_str = "tcp://"; ++ addr_str += server_ip; ++ addr_str += ":"; ++ addr_str += to_string(port); ++ node_ip = addr_str; ++ return RET_OK; ++} ++ ++int32_t ClientAdaptorMsg::get_node_raw_ip(int32_t clusterId, uint32_t node_index, string& node_ip) { ++ uint32_t node_id = (node_index & NODE_ID_MASK) >> NODE_ID_OFFSET_BIT; ++ uint32_t port_index = node_index & PORT_INDEX_MASK; ++ NodeInfo info = {0}; ++ if (mgr_ref->get_node_info(clusterId, node_id, &info)){ ++ std::cout << __func__ << " Client Adaptor: Get node info failed." << std::endl; ++ return RET_CCM_NODE_INFO_ERROR; ++ } ++ uint32_t port = info.ports[port_index]; ++ if (port < GC_PORT_MIN || port > GC_PORT_MAX) { ++ return RET_CCM_PORT_NUM_ERROR; ++ } ++ ++ node_ip = info.publicAddrStr; ++ ++ if (!valid_ip(node_ip)) { ++ return RET_CCM_IP_ERROR; ++ } ++ ++ return RET_OK; ++} ++ ++void ClientAdaptorMsg::set_mgr(ClientAdaptorMgr* mgr){ ++ mgr_ref = mgr; ++ return; ++} ++ ++ClientAdaptorMgr* ClientAdaptorMsg::get_mgr(){ ++ return mgr_ref; ++} ++ ++void das_req_prefetch(DasKvParam *params) ++{ ++ if (params == NULL) { ++ return; ++ } ++ ClientAdaptorMsg *msg_ref = static_cast(params->handle); ++ Objecter *obj = static_cast(params->ctx); ++ if (msg_ref->is_valid_object(obj) ==false) { ++ return; ++ } ++ uint64_t offset = params->offset & SEGMENT_MASK; ++ int id = params->objId; ++ uint64_t left = params->len; ++ while(left) { ++ uint64_t max = std::min(SEGMENT_SIZE - offset, left); ++ ++ char buff[params->imageIdLen+OBJECT_ID_LEN + 1]; ++ snprintf(buff, params->imageIdLen + 1, "%s", params->imageIdBuf); ++ snprintf(buff+params->imageIdLen, OBJECT_ID_LEN+1, "%016x", id); ++ std::string oid_name(buff); ++ uint32_t pt_id; ++ int64_t pool_id = params->cephPoolId; ++ int32_t node_id = msg_ref->get_node_id(params->clusterId, oid_name, pool_id, pt_id); ++ if (node_id < 0) { ++ ceph_abort(); ++ } ++ bufferlist indata; ++ encode(offset, indata); ++ encode(max, indata); ++ msg_ref->push_strategy(obj, params->cephPoolId, node_id, oid_name, indata); ++ ++ left -= max; ++ offset = 0; ++ id++; ++ }; ++} ++ ++int32_t ClientAdaptorMsg::das_init(Objecter *obj) ++{ ++ int32_t rc; ++ das_objs.insert(obj); ++ if (initialized) ++ return 0; ++ DasModuleParam *dasInstanceParam = new DasModuleParam(); ++ DasOPS *regOps = new DasOPS(); ++ regOps->SubmitDasPrefetch = das_req_prefetch; ++ dasInstanceParam->ops = regOps; ++ ++ rc = OpenRcacheCeateDasModule(this, dasInstanceParam); ++ if (rc) { ++ return -1; ++ } ++ initialized = true; ++ return 0; ++} ++ ++int32_t ClientAdaptorMsg::das_update_info(int32_t clusterId, Objecter *obj, Objecter::Op *op) ++{ ++ if (!initialized) ++ return 0; ++ DasKvParam *params[op->ops.size()]; ++ ++ if((op->target.flags & CEPH_OSD_FLAG_WRITE) == CEPH_OSD_FLAG_WRITE) ++ return 0; ++ string obj_name = op->target.base_oid.name; ++ if (obj_name.compare(0, RBD_DATA_OBJECT_NAME_FILTER_LEN, RBD_DATA_OBJECT_NAME)) ++ return 0; ++ std::size_t found = obj_name.find_last_of('.'); ++ if(found == std::string::npos) ++ return -EINVAL; ++ uint64_t objId = std::stol(obj_name.substr(found+1, OBJECT_ID_LEN), nullptr, 16); ++ uint64_t ns = ceph_clock_now().to_nsec(); ++ int i = 0; ++ for(vector::iterator p = op->ops.begin(); p != op->ops.end(); ++p) { ++ if (p->op.op == CEPH_OSD_OP_READ || p->op.op == CEPH_OSD_OP_SPARSE_READ || p->op.op == CEPH_OSD_OP_SYNC_READ) { ++ params[i] = reinterpret_cast(new char[sizeof(DasKvParam) + found + 1]); ++ params[i]->offset = p->op.extent.offset; ++ params[i]->len = p->op.extent.length; ++ params[i]->opcode = 0; ++ params[i]->timeStamp = ns; ++ params[i]->cephPoolId = op->target.base_oloc.pool; ++ params[i]->algType = DAS_ALG_SEQ; ++ params[i]->objId = objId; ++ params[i]->imageIdLen =found + 1; ++ params[i]->clusterId = clusterId; ++ memcpy(params[i]->imageIdBuf, obj_name.c_str(), params[i]->imageIdLen); ++ params[i]->handle = this; ++ params[i]->ctx = obj; ++ i++; ++ } ++ } ++ ++ if (i) { ++ int rc = OpenRcachePutDasInfo(params, i); ++ if (rc) ++ return -1; ++ } ++ return 0; ++} ++ ++int32_t ClientAdaptorMsg::get_node_pool(const int64_t pool_id, int32_t *nodeId) ++{ ++ return mgr_ref->get_node_from_ma(pool_id, nodeId); ++} +diff --git a/src/client_adaptor/ClientAdaptorMsg.h b/src/client_adaptor/ClientAdaptorMsg.h +new file mode 100644 +index 00000000..80afa785 +--- /dev/null ++++ b/src/client_adaptor/ClientAdaptorMsg.h +@@ -0,0 +1,87 @@ ++/* License:LGPL-2.1 ++* ++* Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++* ++*/ ++ ++#ifndef CLIENT_ADAPTOR_MSG_H ++#define CLIENT_ADAPTOR_MSG_H ++ ++#include ++#include ++#include ++#include "open_das.h" ++ ++#include "osdc/Objecter.h" ++#include "ClientAdaptorMgr.h" ++ ++class ClientAdaptorMsg { ++public: ++ ClientAdaptorMsg(ClientAdaptorMgr* mgr); ++ ~ClientAdaptorMsg() {}; ++ ++ void push_strategy(Objecter *objecter, uint64_t pool_id, int32_t node_id, std::string oid_name, bufferlist &indata); ++ ++ int32_t das_init(Objecter *obj); ++ ++ void das_remove(Objecter *obj) { ++ das_objs.erase(obj); ++ if (das_objs.empty() && initialized) { ++ initialized = false; ++ OpenRcacheExitDasModule(this); ++ } ++ } ++ ++ bool filter_msg(Objecter::op_target_t *t); ++ ++ bool filter_msg_by_op(Objecter::Op *op); ++ ++ const string name() {return "ClientAdaptorMsg";} ++ ++ bool is_node(uint32_t index); ++ ++ int32_t get_node_id(int32_t clusterId, string obj_name, int64_t pool_id, uint32_t& pt_index); ++ ++ int32_t get_node_ip(int32_t clusterId, uint32_t node_index, string& node_ip); ++ ++ int32_t get_node_raw_ip(int32_t clusterId, uint32_t node_index, string& node_ip); ++ ++ void set_mgr(ClientAdaptorMgr* mgr); ++ ++ ClientAdaptorMgr* get_mgr(void); ++ ++ bool is_valid_object(Objecter *obj) { ++ auto it = das_objs.find(obj); ++ if (it != das_objs.end()) ++ return true; ++ return false; ++ } ++ ++ int32_t das_update_info(int32_t clusterId, Objecter *obj, Objecter::Op *op); ++ ++ int32_t get_node_pool(const int64_t pool_id, int32_t *nodeId); ++ ++ string gc_snap_prefix(); ++ bool is_gc_snap(string snap_name); ++ void gen_random_gc_snap(uint64_t snap_id, int num, string &rd_snap_name); ++ ++protected: ++ ClientAdaptorMgr* mgr_ref; ++ ++ const int FLAG_OFFSET_BIT = 20; ++ const int CLUSTER_ID_OFFSET = 16; ++ const int PORT_INDEX_MASK = 0xf; ++ const int NODE_ID_OFFSET_BIT = 4; ++ const int NODE_ID_MASK = 0xfff0; ++ const int PORT_SUPPORT_MAX = 16; ++private: ++ bool initialized = false; ++ std::set das_objs; ++ bool valid_ip(string ip_addr); ++public: ++ std::unordered_set connections; ++ mutable std::shared_mutex connlock; ++}; ++ ++ ++#endif +diff --git a/src/client_adaptor/ClientAdaptorPerf.cc b/src/client_adaptor/ClientAdaptorPerf.cc +new file mode 100644 +index 00000000..cea78351 +--- /dev/null ++++ b/src/client_adaptor/ClientAdaptorPerf.cc +@@ -0,0 +1,85 @@ ++/* License:LGPL-2.1 ++* ++* Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++* ++*/ ++ ++#include "ClientAdaptorPerf.h" ++#include "ClientAdaptorPlugin.h" ++using namespace std; ++ ++#include ++#define gettid() syscall(__NR_gettid) ++ ++void ClientAdaptorPerf::start_tick(Objecter::Op *op) { ++ gettimeofday(&(op->perf_tick.start), NULL); ++ return; ++} ++ ++void ClientAdaptorPerf::end_tick(Objecter::Op *op) { ++ gettimeofday(&(op->perf_tick.end), NULL); ++ return; ++} ++ ++void ClientAdaptorPerf::record_op(Objecter::Op *op) { ++ for (vector::iterator p = op->ops.begin(); p != op->ops.end(); ++p) { ++ if (p->op.op == CEPH_OSD_OP_READ || p->op.op == CEPH_OSD_OP_SPARSE_READ || p->op.op == CEPH_OSD_OP_SYNC_READ) { ++ read.op_count++; ++ read.time_cost += (op->perf_tick.end.tv_sec - op->perf_tick.start.tv_sec) * 1000 * 1000 + \ ++ (op->perf_tick.end.tv_usec - op->perf_tick.start.tv_usec); ++ } else if (p->op.op == CEPH_OSD_OP_WRITE || p->op.op == CEPH_OSD_OP_WRITEFULL) { ++ write.op_count++; ++ write.time_cost += (op->perf_tick.end.tv_sec - op->perf_tick.start.tv_sec) * 1000 * 1000 + \ ++ (op->perf_tick.end.tv_usec-op->perf_tick.start.tv_usec); ++ } ++ } ++ return; ++} ++ ++std::function ClientAdaptorPerf::create_thread(const ClientAdaptorPlugin* in) { ++ const ClientAdaptorPlugin* plugin = in; ++ return [this, plugin](){ ++ char thread_name[16]; ++ sprintf(thread_name, "ca-perf-tick"); ++ pthread_setname_np(pthread_self(), thread_name); ++ uint64_t read_cnt = 0; ++ uint64_t read_cost = 0; ++ uint64_t write_cnt = 0; ++ uint64_t write_cost = 0; ++ uint64_t read_lat = 0xff; ++ uint64_t write_lat = 0xff; ++ float avg_flight = 0; ++ ++ while (!tick_done) { ++ sleep (3); ++ read_cnt = plugin->perf_ref->read.op_count; ++ read_cost = plugin->perf_ref->read.time_cost; ++ write_cnt = plugin->perf_ref->write.op_count; ++ write_cost = plugin->perf_ref->write.time_cost; ++ if (read_cnt != 0) { ++ read_lat = read_cost/read_cnt; ++ } ++ if (write_cnt != 0) { ++ write_lat = write_cost/write_cnt; ++ } ++ if ((read_cnt + write_cnt) != 0 ) { ++ avg_flight = (float)total_in_flight / (float)(read_cnt + write_cnt); ++ } ++ outfile << "*************************************************************************" << std::endl; ++ outfile << "PID: " << getpid() << " TID: " << gettid() << std::endl; ++ outfile << " total_count avg_latency(us)" << std::endl; ++ outfile << "read " << setw(16) << read_cnt << " " << setw(16) << read_lat << std::endl; ++ outfile << "write" << setw(16) << write_cnt << " " << setw(16) << write_lat << std::endl; ++ outfile << " total_count average" << std::endl; ++ outfile << "in-flight" << setw(12) << total_in_flight ++ << " " << setw(16) << fixed << setprecision(1) << avg_flight << std::endl; ++ } ++ }; ++} ++ ++void ClientAdaptorPerf::start_record(ClientAdaptorPlugin* plugin) { ++ std::function perf_thread = create_thread(plugin); ++ threads.push_back(std::thread(perf_thread)); ++ outfile.open("/var/log/ceph/perf_tick.log", ios::out | ios::app); ++ return; ++} +diff --git a/src/client_adaptor/ClientAdaptorPerf.h b/src/client_adaptor/ClientAdaptorPerf.h +new file mode 100644 +index 00000000..f7ee5be5 +--- /dev/null ++++ b/src/client_adaptor/ClientAdaptorPerf.h +@@ -0,0 +1,55 @@ ++/* License:LGPL-2.1 ++* ++* Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++* ++*/ ++ ++#ifndef CLIENT_ADAPTOR_PERF_H ++#define CLIENT_ADAPTOR_PERF_H ++ ++#include ++#include ++#include ++#include ++#include ++#include ++#include ++#include ++ ++#include "osdc/Objecter.h" ++ ++class ClientAdaptorPlugin; ++ ++class ClientAdaptorPerf { ++public: ++ ClientAdaptorPerf(){} ++ ~ClientAdaptorPerf(){} ++ ++ ++ struct op_perf_t { ++ std::atomic op_count{0}; ++ std::atomic time_cost{0}; ++ }; ++ ++void start_tick(Objecter::Op *op); ++ ++void end_tick(Objecter::Op *op); ++ ++void record_op(Objecter::Op *op); ++ ++void start_record(ClientAdaptorPlugin* plugin); ++ ++std::function create_thread(const ClientAdaptorPlugin* plugin); ++ ++const string name() {return "ClientAdaptorPerf";} ++vector threads; ++bool tick_done{false}; ++std::ofstream outfile; ++std::atomic total_in_flight{0}; ++private: ++ struct op_perf_t read; ++ struct op_perf_t write; ++ ++}; ++ ++#endif +diff --git a/src/client_adaptor/ClientAdaptorPlugin.cc b/src/client_adaptor/ClientAdaptorPlugin.cc +new file mode 100644 +index 00000000..c44e3480 +--- /dev/null ++++ b/src/client_adaptor/ClientAdaptorPlugin.cc +@@ -0,0 +1,45 @@ ++/* License:LGPL-2.1 ++* ++* Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++* ++*/ ++ ++#include ++#include "ClientAdaptorPlugin.h" ++#include "ceph_ver.h" ++#include "ClientAdaptorMsg.h" ++#include "ClientAdaptorMgr.h" ++#include "ClientAdaptorPerf.h" ++ ++ ++ ++ ++ClientAdaptorPlugin::~ClientAdaptorPlugin() { ++ delete mgr_ref; ++ delete msg_ref; ++ delete perf_ref; ++} ++ ++const char *__ceph_plugin_version() ++{ ++ return CEPH_GIT_NICE_VER; ++} ++ ++ ++int __ceph_plugin_init(CephContext *cct, ++ const std::string& type, ++ const std::string& name) ++{ ++ PluginRegistry *instance = cct->get_plugin_registry(); ++ if (cct->_conf.get_val("global_cache_debug_mode")){ ++ ClientAdaptorLocal* ccm = new ClientAdaptorLocal(); ++ ClientAdaptorMsg* msg = new ClientAdaptorMsg(ccm); ++ ClientAdaptorPerf* perf = new ClientAdaptorPerf(); ++ return instance->add(type, name, new ClientAdaptorPlugin(cct, msg, ccm, perf)); ++ } else { ++ ClientAdaptorCcm* ccm = new ClientAdaptorCcm(); ++ ClientAdaptorMsg* msg = new ClientAdaptorMsg(ccm); ++ ClientAdaptorPerf* perf = new ClientAdaptorPerf(); ++ return instance->add(type, name, new ClientAdaptorPlugin(cct, msg, ccm, perf)); ++ } ++} +diff --git a/src/client_adaptor/ClientAdaptorPlugin.h b/src/client_adaptor/ClientAdaptorPlugin.h +new file mode 100644 +index 00000000..fa7d71d2 +--- /dev/null ++++ b/src/client_adaptor/ClientAdaptorPlugin.h +@@ -0,0 +1,39 @@ ++/* License:LGPL-2.1 ++* ++* Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++* ++*/ ++ ++#ifndef CLIENT_ADAPTOR_PLUGIN_H ++#define CLIENT_ADAPTOR_PLUGIN_H ++#include ++ ++//#include "ceph_ver.h" ++#include "common/PluginRegistry.h" ++#include "common/ceph_context.h" ++//#include "acconfig.h" ++ ++ ++class ClientAdaptorMsg; ++class ClientAdaptorMgr; ++class ClientAdaptorPerf; ++ ++class ClientAdaptorPlugin : public Plugin { ++public: ++ ClientAdaptorPlugin(CephContext* cct, ClientAdaptorMsg* msg, ClientAdaptorMgr* mgr, ++ ClientAdaptorPerf* perf) : Plugin(cct), msg_ref(msg), mgr_ref(mgr), perf_ref(perf) ++ { ++ } ++ ++ ~ClientAdaptorPlugin(); ++ ++ ClientAdaptorMsg* msg_ref; ++ ClientAdaptorMgr* mgr_ref; ++ ClientAdaptorPerf* perf_ref; ++ ++ const string name() { ++ return "ClientAdaptorPlugin"; ++ } ++}; ++ ++#endif +diff --git a/src/client_adaptor/open_ccm.h b/src/client_adaptor/open_ccm.h +new file mode 100644 +index 00000000..50b8b372 +--- /dev/null ++++ b/src/client_adaptor/open_ccm.h +@@ -0,0 +1,224 @@ ++/* ++ * Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++ ++ * Licensed under the Apache License, Version 2.0 (the "License"); ++ * you may not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++*/ ++#ifndef __CCM_INTERFACE_H__ ++#define __CCM_INTERFACE_H__ ++ ++#include ++#include ++#include ++ ++#ifdef __cplusplus ++extern "C" { ++#endif ++ ++#define PT_VIEW_NODE_MAX_DOMAIN 32 ++#define PT_VIEW_MAX_POOL 256 ++#define MAX_CCM_CTRL_NODE_NUM 128 ++ ++#define MAX_PT_ENTRY 1024 ++ ++#define MAX_SERVER_NUM MAX_CCM_CTRL_NODE_NUM ++ ++#define CCM_MAX_DISK_NUM 1024 ++#define CCM_VNODE_NUM_PER_NODE 8 ++#define MAX_DISK_NUM_PER_NODE 16 ++#define IP_ADDR_LEN (16) ++#define DISK_NAME_LEN (64) ++#define DISK_SN_LEN (64) ++ ++#define MAX_POOL_NAME_LEN (256) ++#define CCM_MAX_POOL_NUM (4096) ++#define MAX_PORT_NUM (8) ++ ++#define ZK_IP_ADDR_LEN (16) ++#define ZK_DISK_NAME_LEN (64) ++#define ZK_DISK_SN_LEN (64) ++ ++#define CCM_VERSION_0 0 ++#define CCM_VERSION_1 1 ++#define CCM_CURRENT_VERSION CCM_VERSION_0 ++#define CCM_Upgrade_Embedding_LEN 64 ++ ++typedef enum { ++ NODE_STATE_INVALID = 0, ++ NODE_STATE_UP = 1, ++ NODE_STATE_STARTING = 2, ++ NODE_STATE_RUNNING = 3, ++ NODE_STATE_UNWORK = 4, ++ NODE_STATE_DOWN = 5, ++ NODE_STATE_BUTT ++} NodeState; ++ ++typedef enum { ++ VDISK_STATE_DOWN = 0, ++ VDISK_STATE_UP = 1, ++ VIDSK_STATE_BUTT ++} VdiskState; ++ ++typedef struct { ++ uint32_t NodeId; ++ uint32_t ptNum; ++ uint32_t (*ptMap)[2]; ++} NodePtInfo; ++ ++typedef struct { ++ uint32_t nodeId; ++ uint32_t diskId; ++ uint32_t localDiskId; ++ char diskName[DISK_NAME_LEN]; ++ char sn[DISK_SN_LEN]; ++ uint32_t capacity; ++ uint32_t usedCap; ++ VdiskState state; ++ bool isFirstFormat; ++} VdiskInfo; ++ ++typedef struct { ++ uint32_t rackId; ++ uint32_t nodeId; ++ NodeState state; ++ uint32_t ipv4addr; ++ char ipv4AddrStr[IP_ADDR_LEN]; ++ char publicAddrStr[IP_ADDR_LEN]; ++ char clusterAddrStr[IP_ADDR_LEN]; ++ int32_t portNum; ++ uint32_t ports[MAX_PORT_NUM]; ++ uint32_t diskNum; ++ VdiskInfo diskList[MAX_DISK_NUM_PER_NODE]; ++ uint64_t version; ++ NodeState inOutState; ++ NodeState runningState; ++} NodeInfo; ++ ++typedef enum { ++ CCM_PT_STATE_INIT=0, ++ CCM_PT_STATE_OK, ++ CCM_PT_STATE_TRIM, ++ CCM_PT_STATE_REPLAY, ++ CCM_PT_STATE_FAULT, ++} PtState; ++ ++typedef struct { ++ uint32_t nodeId; ++ uint32_t vnodeId; ++} PtSrcNodeInfo; ++ ++typedef struct { ++ uint32_t nodeId; ++ uint32_t diskId; ++ uint32_t vnodeId; ++} PtNodeInfo; ++ ++typedef struct { ++ uint32_t version; ++ bool ptChange; ++ uint32_t birthVersion; ++ uint32_t ptId; ++ uint32_t indexInNode; ++ PtState state; ++ PtNodeInfo curNodeInfo; ++ PtNodeInfo srcNodeInfo; ++ char reserved[CCM_Upgrade_Embedding_LEN]; ++} PtInfo; ++ ++typedef struct _PtView { ++ uint32_t version; ++ uint32_t globalVersion; ++ uint32_t ptNum; ++ char reserved[CCM_Upgrade_Embedding_LEN]; ++ PtInfo ptInfo[0]; ++} PtView; ++ ++typedef PtInfo PTViewPtEntry; ++ ++/* NodeView callback */ ++typedef struct { ++ void *ctx; ++ int32_t (*notifyNodeChange)(NodeInfo *nodeList, uint32_t nodeNum, void *ctx); ++} NodeViewChangeOpHandle; ++ ++/* PTView callback */ ++typedef struct { ++ void *ctx; ++ int32_t (*notifyPtChange)(PTViewPtEntry *entry, uint32_t entryNum, void *ctx); ++} PTViewChangeOpHandle; ++ ++typedef enum { ++ CCM_MODULE_INFRAS = 0, ++ CCM_MODULE_PLOG, ++ CCM_MODULE_INDEX, ++ CCM_MODULE_CACHE, ++ CCM_MODULE_CLIENT, ++ CCM_MODULE_CEPH, ++ CCM_MODULE_BUTT, ++} ModuleType; ++ ++typedef enum { ++ CACHENODE = 0, ++ CLIENTNODE = 1, ++} NodeType; ++ ++typedef enum { ++ IOTYPE_READ = 0, ++ IOTYPE_WRITE = 1, ++} IOType_E; ++ ++typedef struct { ++ int64_t mdPoolId; ++ int64_t dataPoolId; ++ const char *imageId; ++ uint64_t numObjs; ++ uint64_t snapId; ++ uint64_t oldHeadSnapId; ++ uint64_t rollbackSnapId; ++ uint64_t snapSeq; ++} RollbackInfo; ++ ++int32_t OpenAgentInit(int64_t poolId, int32_t *nodeId); ++ ++int32_t OpenBcmInit(void); ++ ++int32_t OpenGetPtEntry(int32_t clusterId, uint32_t ptId, PTViewPtEntry *entry); ++ ++uint32_t OpenGetTotalPtNum(int32_t clusterId); ++ ++int32_t OpenAgentGetNodeInfo(int32_t clusterId, uint32_t nodeId, NodeInfo *nodeInfo); ++ ++int32_t OpenRegisterViewChangeNotifyChain(PTViewChangeOpHandle *handle); ++ ++int32_t OpenRegisterNodeViewChangeNotifyChain(NodeViewChangeOpHandle *handle); ++ ++void OpenDeregisterViewChangeNotifyChain(PTViewChangeOpHandle *handle); ++ ++void OpenDeregisterNodeViewChangeNotifyChain(NodeViewChangeOpHandle *handle); ++ ++int32_t OpenCreateSnapshot(int64_t mdPoolId, int64_t dataPoolId, const char *imageId, uint64_t snapId); ++ ++int32_t OpenDeleteSnapshot(int64_t dataPoolId, const char *nameSpace, const char *imageId, uint64_t snapId); ++ ++int32_t OpenReleaseImageResource(int64_t poolId, const char *imageId); ++ ++int32_t OpenRollbackSnapshot(RollbackInfo *info); ++ ++int32_t OpenImageBusy(int64_t dataPoolId, const char *imageId); ++ ++int32_t OpenSnapshotBusy(int64_t dataPoolId, const char *imageId, int64_t snapId); ++ ++#ifdef __cplusplus ++} ++#endif ++ ++#endif // __CCM_INTERFACE_H__ +\ No newline at end of file +diff --git a/src/client_adaptor/open_das.h b/src/client_adaptor/open_das.h +new file mode 100644 +index 00000000..6eac9f73 +--- /dev/null ++++ b/src/client_adaptor/open_das.h +@@ -0,0 +1,67 @@ ++/* ++ * Copyright (c) 2021 Huawei Technologies Co., Ltd All rights reserved. ++ ++ * Licensed under the Apache License, Version 2.0 (the "License"); ++ * you may not use this file except in compliance with the License. ++ * You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++*/ ++#ifndef OPEN_DAS_H ++#define OPEN_DAS_H ++ ++#include ++#include ++#include ++#include ++ ++typedef enum EnumDasResult { ++ RETURN_DAS_FULL = -2, ++ RETURN_DAS_ERROR = -1, ++ RETURN_DAS_OK = 0, ++ RETURN_DAS_EMPTY = 1, ++ RETURN_DAS_DELETING = 2, ++} DAS_RESULT; ++ ++typedef enum TagDasAlgType { ++ DAS_ALG_SEQ = 0, ++ DAS_ALG_REVERSE_SEQ, ++ DAS_ALG_STRIDE, ++ DAS_ALG_BUTT, ++} DasAlgType; ++ ++typedef struct TagDasKvParam { ++ uint64_t offset; ++ uint64_t len; ++ uint8_t opcode; ++ uint64_t timeStamp; ++ int64_t cephPoolId; ++ DasAlgType algType; ++ uint64_t objId; ++ uint32_t imageIdLen; ++ int32_t clusterId; ++ void *ctx; ++ void *handle; ++ char imageIdBuf[0]; ++} DasKvParam; ++ ++typedef struct TagDasOPS { ++ void (*SubmitDasPrefetch)(DasKvParam* params); ++} DasOPS; ++ ++typedef struct TagDasModuleParam { ++ DasOPS *ops; ++} DasModuleParam; ++ ++int32_t OpenRcacheCeateDasModule(void *handle, DasModuleParam *createInstanceParam); ++ ++int32_t OpenRcachePutDasInfo(DasKvParam *params[], uint32_t keyNum); ++ ++void OpenRcacheExitDasModule(void *handle); ++#endif // OPEN_DAS_H +\ No newline at end of file +diff --git a/src/common/options.cc b/src/common/options.cc +index 8135ea8f..9d3bb781 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -1015,7 +1015,7 @@ std::vector