From f6ec2c621812ce176182c812dc2d514ac0b524d7 Mon Sep 17 00:00:00 2001 From: smjiao Date: Mon, 1 Jul 2024 17:25:43 +0800 Subject: [PATCH] conf sync optimization and add file trace interface --- ...nf-trace-info-and-conf-sync-optimize.patch | 3043 +++++++++++++++++ aops-zeus.spec | 23 +- 2 files changed, 3059 insertions(+), 7 deletions(-) create mode 100644 0012-conf-trace-info-and-conf-sync-optimize.patch diff --git a/0012-conf-trace-info-and-conf-sync-optimize.patch b/0012-conf-trace-info-and-conf-sync-optimize.patch new file mode 100644 index 0000000..ef41204 --- /dev/null +++ b/0012-conf-trace-info-and-conf-sync-optimize.patch @@ -0,0 +1,3043 @@ +diff --git a/ansible_task/inventory/hosts.yml b/ansible_task/inventory/hosts.yml +new file mode 100644 +index 0000000..ca2b55b +--- /dev/null ++++ b/ansible_task/inventory/hosts.yml +@@ -0,0 +1,4 @@ ++all: ++ children: ++ sync: ++ hosts: {} +\ No newline at end of file +diff --git a/ansible_task/playbook_entries/conf_trace.yml b/ansible_task/playbook_entries/conf_trace.yml +new file mode 100644 +index 0000000..60ea52f +--- /dev/null ++++ b/ansible_task/playbook_entries/conf_trace.yml +@@ -0,0 +1,14 @@ ++- name: sync config to host ++ hosts: all ++ remote_user: root ++ gather_facts: no ++ max_fail_percentage: 30 ++ strategy: free ++ vars: ++ - ip: "{{ ip }}" ++ - port: "{{ port }}" ++ - conf_list_str: "{{ conf_list_str }}" ++ - domain_name: "{{ domain_name }}" ++ - host_id: "{{ hostvars[inventory_hostname]['host_id'] }}" ++ roles: ++ - ../roles/conf_trace +\ No newline at end of file +diff --git a/ansible_task/playbook_entries/sync_config.yml b/ansible_task/playbook_entries/sync_config.yml +new file mode 100644 +index 0000000..7819723 +--- /dev/null ++++ b/ansible_task/playbook_entries/sync_config.yml +@@ -0,0 +1,9 @@ ++- name: sync config to host ++ hosts: all ++ remote_user: root ++ gather_facts: no ++ max_fail_percentage: 30 ++ serial: "{{ serial_count }}" ++ strategy: free ++ roles: ++ - ../roles/sync_domain_config +\ No newline at end of file +diff --git a/ansible_task/roles/conf_trace/tasks/main.yml b/ansible_task/roles/conf_trace/tasks/main.yml +new file mode 100644 +index 0000000..446bbfc +--- /dev/null ++++ b/ansible_task/roles/conf_trace/tasks/main.yml +@@ -0,0 +1,49 @@ ++--- ++- name: install dependency ++ dnf: ++ name: python3-libselinux ++ state: present ++ when: action == "start" or action == "update" ++- name: copy ragdoll-filetrace bin ++ copy: ++ src: /usr/bin/ragdoll-filetrace ++ dest: /usr/bin/ragdoll-filetrace ++ owner: root ++ group: root ++ mode: '0755' ++ when: action == "start" or action == "update" ++- name: copy ragdoll-filetrace systemctl service config ++ copy: ++ src: /usr/lib/systemd/system/ragdoll-filetrace.service ++ dest: /usr/lib/systemd/system/ragdoll-filetrace.service ++ owner: root ++ group: root ++ mode: '0755' ++ when: action == "start" or action == "update" ++- name: reload systemctl service config ++ command: systemctl daemon-reload ++ when: action == "start" or action == "update" ++- name: enable ragdoll-filetrace systemd ++ command: systemctl enable ragdoll-filetrace ++ when: action == "start" or action == "update" ++- name: dependency install ++ shell: yum install python3-psutil kernel-devel-$(uname -r) bcc-tools bcc python3-bpfcc python3-requests llvm-12.0.1-4.iss22 llvm-libs-12.0.1-4.iss22 -y ++ when: action == "start" ++- name: Ensure /etc/ragdoll-filetrace directory exists ++ file: ++ path: /etc/ragdoll-filetrace ++ state: directory ++ mode: '0755' ++ when: action == "update" or action == "start" ++- name: update ragdoll-filetrace config ++ template: ++ src: agith.config.j2 ++ dest: /etc/ragdoll-filetrace/ragdoll-filetrace.conf ++ mode: '0755' ++ when: action == "update" or action == "start" ++- name: stop ragdoll-filetrace when action is update ++ command: systemctl stop ragdoll-filetrace ++ when: action == "update" or action == "stop" ++- name: start ragdoll-filetrace systemd ++ command: systemctl start ragdoll-filetrace ++ when: action == "update" or action == "start" +diff --git a/ansible_task/roles/conf_trace/templates/agith.config.j2 b/ansible_task/roles/conf_trace/templates/agith.config.j2 +new file mode 100644 +index 0000000..c7a1476 +--- /dev/null ++++ b/ansible_task/roles/conf_trace/templates/agith.config.j2 +@@ -0,0 +1,18 @@ ++{ ++ "Repository": { ++ "concern_syscalls": [ ++ "write", ++ "clone", ++ "unlinkat", ++ "unlink", ++ "connect", ++ "sendto", ++ "recvfrom", ++ "mkdir" ++ ], ++ "aops_zeus": "http://{{ ip }}:{{ port }}/conftrace/data", ++ "conf_list": "{{ conf_list_str }}", ++ "domain_name": "{{ domain_name }}", ++ "host_id": "{{ host_id }}" ++ } ++} +\ No newline at end of file +diff --git a/ansible_task/roles/sync_domain_config/tasks/main.yml b/ansible_task/roles/sync_domain_config/tasks/main.yml +new file mode 100644 +index 0000000..29b8857 +--- /dev/null ++++ b/ansible_task/roles/sync_domain_config/tasks/main.yml +@@ -0,0 +1,18 @@ ++--- ++- name: Check if software is installed ++ command: rpm -qi python3-libselinux ++ register: result ++ ignore_errors: yes ++- name: install dependency ++ dnf: ++ name: python3-libselinux ++ state: present ++ when: "'not installed' in result.stdout" ++- name: sync config to host ++ copy: ++ src: "{{ item.key }}" ++ dest: "{{ item.value }}" ++ owner: root ++ group: root ++ mode: '0644' ++ with_dict: "{{ file_path_infos }}" +\ No newline at end of file +diff --git a/aops-zeus.spec b/aops-zeus.spec +index 0d62cd6..badc290 100644 +--- a/aops-zeus.spec ++++ b/aops-zeus.spec +@@ -37,6 +37,7 @@ cp -r database %{buildroot}/opt/aops/ + %files + %doc README.* + %attr(0644,root,root) %{_sysconfdir}/aops/zeus.ini ++%attr(0644,root,root) %{_sysconfdir}/aops/zeus_crontab.yml + %attr(0755,root,root) %{_bindir}/aops-zeus + %attr(0755,root,root) %{_unitdir}/aops-zeus.service + %{python3_sitelib}/aops_zeus*.egg-info +diff --git a/conf/zeus.ini b/conf/zeus.ini +index 945d6b4..c87110f 100644 +--- a/conf/zeus.ini ++++ b/conf/zeus.ini +@@ -36,4 +36,11 @@ port=11112 + + [apollo] + ip=127.0.0.1 +-port=11116 +\ No newline at end of file ++port=11116 ++ ++[serial] ++serial_count=10 ++ ++[update_sync_status] ++update_sync_status_address = "http://127.0.0.1" ++update_sync_status_port = 11114 +\ No newline at end of file +diff --git a/conf/zeus_crontab.yml b/conf/zeus_crontab.yml +new file mode 100644 +index 0000000..d60d306 +--- /dev/null ++++ b/conf/zeus_crontab.yml +@@ -0,0 +1,31 @@ ++# Timed task configuration file specification (YAML): ++ ++# Name of a scheduled task, name should be unique e.g ++# task: download security bulletin ++ ++# Task type, only 'update_config_sync_status' are supported ++# type: update_config_sync_status ++ ++# Whether scheduled tasks are allowed to run ++# enable: true ++ ++# meta info for the task, it's customised for user ++# meta: ++# cvrf_url: https://repo.openeuler.org/security/data/cvrf ++ ++# Timed config, set the scheduled time and polling policy ++# timed: ++# value between 0-6, for example, 0 means Monday, 0-6 means everyday ++# day_of_week: 0-6 ++# value between 0-23, for example, 2 means 2:00 in a day ++# hour: 3 ++# Polling strategy, The value can only be 'cron' 'date' 'interval', default value is 'cron' ++# trigger: cron ++ ++- task: update config sync status ++ type: update_config_sync_status_task ++ enable: true ++ timed: ++ day_of_week: 0-6 ++ hour: 3 ++ trigger: cron +\ No newline at end of file +diff --git a/database/zeus.sql b/database/zeus.sql +index b54e931..6d9a722 100644 +--- a/database/zeus.sql ++++ b/database/zeus.sql +@@ -54,4 +54,23 @@ CREATE TABLE IF NOT EXISTS `host` ( + CONSTRAINT `host_ibfk_2` FOREIGN KEY (`host_group_id`) REFERENCES `host_group` (`host_group_id`) ON DELETE RESTRICT ON UPDATE RESTRICT + ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic; + ++CREATE TABLE IF NOT EXISTS `host_conf_sync_status` ( ++ `host_id` int(11) NOT NULL, ++ `host_ip` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, ++ `domain_name` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, ++ `sync_status` int(1) unsigned zerofill NULL DEFAULT NULL, ++ CONSTRAINT hd_host_sync PRIMARY KEY (host_id,domain_name), ++ INDEX `sync_status`(`sync_status`) USING BTREE ++) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic; ++ ++CREATE TABLE IF NOT EXISTS `conf_trace_info` ( ++ `UUID` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, ++ `domain_name` varchar(16) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, ++ `host_id` int(11) NOT NULL, ++ `conf_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, ++ `info` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, ++ `create_time` datetime DEFAULT NULL, ++ PRIMARY KEY (`UUID`) USING BTREE ++) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic; ++ + INSERT INTO `user` (`username`, `password`) VALUE('admin', 'pbkdf2:sha256:150000$h1oaTY7K$5b1ff300a896f6f373928294fd8bac8ed6d2a1d6a7c5ea2d2ccd2075e6177896') ON DUPLICATE KEY UPDATE username= 'admin',password='pbkdf2:sha256:150000$h1oaTY7K$5b1ff300a896f6f373928294fd8bac8ed6d2a1d6a7c5ea2d2ccd2075e6177896'; +\ No newline at end of file +diff --git a/setup.py b/setup.py +index 1b45b43..d62ee0c 100644 +--- a/setup.py ++++ b/setup.py +@@ -26,6 +26,7 @@ setup( + author='cmd-lsw-yyy-zyc', + data_files=[ + ('/etc/aops', ['conf/zeus.ini']), ++ ('/etc/aops', ['conf/zeus_crontab.yml']), + ('/usr/lib/systemd/system', ['aops-zeus.service']), + ("/opt/aops/database", ["database/zeus.sql"]), + ], +diff --git a/zeus/conf/constant.py b/zeus/conf/constant.py +index 27aef66..a0e0a98 100644 +--- a/zeus/conf/constant.py ++++ b/zeus/conf/constant.py +@@ -57,9 +57,15 @@ ADD_GROUP = "/manage/host/group/add" + DELETE_GROUP = "/manage/host/group/delete" + GET_GROUP = "/manage/host/group/get" + ++ADD_HOST_SYNC_STATUS = "/manage/host/sync/status/add" ++DELETE_HOST_SYNC_STATUS = "/manage/host/sync/status/delete" ++DELETE_ALL_HOST_SYNC_STATUS = "/manage/all/host/sync/status/delete" ++GET_HOST_SYNC_STATUS = "/manage/host/sync/status/get" ++ + COLLECT_CONFIG = '/manage/config/collect' + SYNC_CONFIG = '/manage/config/sync' + OBJECT_FILE_CONFIG = '/manage/config/objectfile' ++BATCH_SYNC_CONFIG = '/manage/config/batch/sync' + + USER_LOGIN = "/manage/account/login" + LOGOUT = "/manage/account/logout" +@@ -94,6 +100,17 @@ VUL_TASK_CVE_SCAN_NOTICE = "/vulnerability/task/callback/cve/scan/notice" + CHECK_IDENTIFY_SCENE = "/check/scene/identify" + CHECK_WORKFLOW_HOST_EXIST = '/check/workflow/host/exist' + ++# ragdoll ++DOMAIN_LIST_API = "/domain/queryDomain" ++EXPECTED_CONFS_API = "/confs/queryExpectedConfs" ++DOMAIN_CONF_DIFF_API = "/confs/domain/diff" ++ ++# conf trace ++CONF_TRACE_MGMT = "/conftrace/mgmt" ++CONF_TRACE_QUERY = "/conftrace/query" ++CONF_TRACE_DATA = "/conftrace/data" ++CONF_TRACE_DELETE = "/conftrace/delete" ++ + # host template file content + HOST_TEMPLATE_FILE_CONTENT = """host_ip,ssh_port,ssh_user,password,ssh_pkey,host_name,host_group_name,management + 127.0.0.1,22,root,password,private key,test_host,test_host_group,FALSE +@@ -106,6 +123,20 @@ HOST_TEMPLATE_FILE_CONTENT = """host_ip,ssh_port,ssh_user,password,ssh_pkey,host + "4. 上传本文件前,请删除此部分提示内容",,,,,,, + """ + ++# ansible sync config ++PARENT_DIRECTORY = "/opt/aops/ansible_task/" ++HOST_PATH_FILE = "/opt/aops/ansible_task/inventory/" ++SYNC_CONFIG_YML = "/opt/aops/ansible_task/playbook_entries/sync_config.yml" ++CONF_TRACE_YML = "/opt/aops/ansible_task/playbook_entries/conf_trace.yml" ++SYNC_LOG_PATH = "/var/log/aops/sync/" ++CONF_TRACE_LOG_PATH = "/var/log/aops/conftrace/" ++KEY_FILE_PREFIX = "/tmp/" ++KEY_FILE_SUFFIX = "_id_dsa" ++IP_START_PATTERN = r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}" ++ ++DIRECTORY_FILE_PATH_LIST = ["/etc/pam.d"] ++ ++TIMED_TASK_CONFIG_PATH = "/etc/aops/zeus_crontab.yml" + + # cve task status + class CveTaskStatus: +diff --git a/zeus/conf/default_config.py b/zeus/conf/default_config.py +index 5e05f64..1e713ae 100644 +--- a/zeus/conf/default_config.py ++++ b/zeus/conf/default_config.py +@@ -32,7 +32,10 @@ apollo = {"IP": "127.0.0.1", "PORT": 11116} + + redis = {"IP": "127.0.0.1", "PORT": 6379} + +- + prometheus = {"IP": "127.0.0.1", "PORT": 9090, "QUERY_RANGE_STEP": "15s"} + + agent = {"DEFAULT_INSTANCE_PORT": 8888} ++ ++serial = {"SERIAL_COUNT": 10} ++ ++update_sync_status = {"UPDATE_SYNC_STATUS_ADDRESS": "http://127.0.0.1", "UPDATE_SYNC_STATUS_PORT": 11114} +diff --git a/zeus/config_manager/view.py b/zeus/config_manager/view.py +index b012c62..28c9d32 100644 +--- a/zeus/config_manager/view.py ++++ b/zeus/config_manager/view.py +@@ -15,17 +15,29 @@ Time: + Author: + Description: Restful APIs for host + """ ++import glob + import json + import os ++import queue ++import subprocess ++import threading ++import time ++from configparser import RawConfigParser + from typing import List, Dict + ++import yaml ++from vulcanus import LOGGER + from vulcanus.multi_thread_handler import MultiThreadHandler + from vulcanus.restful.resp import state + from vulcanus.restful.response import BaseResponse +-from zeus.conf.constant import CERES_COLLECT_FILE, CERES_SYNC_CONF, CERES_OBJECT_FILE_CONF ++ ++from zeus.conf import configuration ++from zeus.conf.constant import CERES_COLLECT_FILE, CERES_SYNC_CONF, CERES_OBJECT_FILE_CONF, SYNC_LOG_PATH, \ ++ HOST_PATH_FILE, SYNC_CONFIG_YML, PARENT_DIRECTORY, IP_START_PATTERN, KEY_FILE_PREFIX, KEY_FILE_SUFFIX + from zeus.database.proxy.host import HostProxy + from zeus.function.model import ClientConnectArgs +-from zeus.function.verify.config import CollectConfigSchema, SyncConfigSchema, ObjectFileConfigSchema ++from zeus.function.verify.config import CollectConfigSchema, SyncConfigSchema, ObjectFileConfigSchema, \ ++ BatchSyncConfigSchema + from zeus.host_manager.ssh import execute_command_and_parse_its_result, execute_command_sftp_result + + +@@ -147,7 +159,7 @@ class CollectConfig(BaseResponse): + + return file_content + +- @BaseResponse.handle(schema=CollectConfigSchema, token=False) ++ @BaseResponse.handle(schema=CollectConfigSchema, token=True) + def post(self, **param): + """ + Get config +@@ -201,16 +213,16 @@ class CollectConfig(BaseResponse): + file_content = self.convert_host_id_to_failed_data_format( + list(host_id_with_config_file.keys()), host_id_with_config_file + ) +- return self.response(code=state.DATABASE_CONNECT_ERROR, data={"resp": file_content}) ++ return self.response(code=state.DATABASE_CONNECT_ERROR, data=file_content) + + status, host_list = proxy.get_host_info( +- {"username": "admin", "host_list": list(host_id_with_config_file.keys())}, True ++ {"username": param.get("username"), "host_list": list(host_id_with_config_file.keys())}, True + ) + if status != state.SUCCEED: + file_content = self.convert_host_id_to_failed_data_format( + list(host_id_with_config_file.keys()), host_id_with_config_file + ) +- return self.response(code=status, data={"resp": file_content}) ++ return self.response(code=status, data=file_content) + # Get file content + tasks = [(host, host_id_with_config_file[host["host_id"]]) for host in host_list] + multi_thread = MultiThreadHandler(lambda data: self.get_file_content(*data), tasks, None) +@@ -262,7 +274,7 @@ class SyncConfig(BaseResponse): + host_info.get("ssh_user"), host_info.get("pkey")), command) + return status + +- @BaseResponse.handle(schema=SyncConfigSchema, token=False) ++ @BaseResponse.handle(schema=SyncConfigSchema, token=True) + def put(self, **params): + + sync_config_info = dict() +@@ -277,19 +289,19 @@ class SyncConfig(BaseResponse): + # Query host address from database + proxy = HostProxy() + if not proxy.connect(): +- return self.response(code=state.DATABASE_CONNECT_ERROR, data={"resp": sync_result}) ++ return self.response(code=state.DATABASE_CONNECT_ERROR, data=sync_result) + + status, host_list = proxy.get_host_info( +- {"username": "admin", "host_list": [params.get('host_id')]}, True) ++ {"username": params.get("username"), "host_list": [params.get('host_id')]}, True) + if status != state.SUCCEED: +- return self.response(code=status, data={"resp": sync_result}) ++ return self.response(code=status, data=sync_result) + + host_info = host_list[0] + status = self.sync_config_content(host_info, sync_config_info) + if status == state.SUCCEED: + sync_result['sync_result'] = True +- return self.response(code=state.SUCCEED, data={"resp": sync_result}) +- return self.response(code=state.UNKNOWN_ERROR, data={"resp": sync_result}) ++ return self.response(code=state.SUCCEED, data=sync_result) ++ return self.response(code=state.UNKNOWN_ERROR, data=sync_result) + + + class ObjectFileConfig(BaseResponse): +@@ -302,7 +314,7 @@ class ObjectFileConfig(BaseResponse): + host_info.get("ssh_user"), host_info.get("pkey")), command) + return status, content + +- @BaseResponse.handle(schema=ObjectFileConfigSchema, token=False) ++ @BaseResponse.handle(schema=ObjectFileConfigSchema, token=True) + def post(self, **params): + object_file_result = { + "object_file_paths": list(), +@@ -311,12 +323,12 @@ class ObjectFileConfig(BaseResponse): + # Query host address from database + proxy = HostProxy() + if not proxy.connect(): +- return self.response(code=state.DATABASE_CONNECT_ERROR, data={"resp": object_file_result}) ++ return self.response(code=state.DATABASE_CONNECT_ERROR, data=object_file_result) + + status, host_list = proxy.get_host_info( +- {"username": "admin", "host_list": [params.get('host_id')]}, True) ++ {"username": params.get("username"), "host_list": [params.get('host_id')]}, True) + if status != state.SUCCEED: +- return self.response(code=status, data={"resp": object_file_result}) ++ return self.response(code=status, data=object_file_result) + + host_info = host_list[0] + status, content = self.object_file_config_content(host_info, params.get('file_directory')) +@@ -326,5 +338,206 @@ class ObjectFileConfig(BaseResponse): + if content_res.get("resp"): + resp = content_res.get("resp") + object_file_result['object_file_paths'] = resp +- return self.response(code=state.SUCCEED, data={"resp": object_file_result}) +- return self.response(code=state.UNKNOWN_ERROR, data={"resp": object_file_result}) ++ return self.response(code=state.SUCCEED, data=object_file_result) ++ return self.response(code=state.UNKNOWN_ERROR, data=object_file_result) ++ ++ ++class BatchSyncConfig(BaseResponse): ++ @staticmethod ++ def run_subprocess(cmd, result_queue): ++ try: ++ completed_process = subprocess.run(cmd, cwd=PARENT_DIRECTORY, shell=True, capture_output=True, text=True) ++ result_queue.put(completed_process) ++ except subprocess.CalledProcessError as ex: ++ result_queue.put(ex) ++ ++ @staticmethod ++ def ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE): ++ if not os.path.exists(SYNC_LOG_PATH): ++ os.makedirs(SYNC_LOG_PATH) ++ ++ SYNC_LOG = SYNC_LOG_PATH + "sync_config_" + now_time + ".log" ++ cmd = f"ansible-playbook -f {ansible_forks} -e '{extra_vars}' " \ ++ f"-i {HOST_FILE} {SYNC_CONFIG_YML} |tee {SYNC_LOG} " ++ result_queue = queue.Queue() ++ thread = threading.Thread(target=BatchSyncConfig.run_subprocess, args=(cmd, result_queue)) ++ thread.start() ++ ++ thread.join() ++ try: ++ completed_process = result_queue.get(block=False) ++ if isinstance(completed_process, subprocess.CalledProcessError): ++ LOGGER.error("ansible subprocess error:", completed_process) ++ else: ++ if completed_process.returncode == 0: ++ return completed_process.stdout ++ else: ++ LOGGER.error("ansible subprocess error:", completed_process) ++ except queue.Empty: ++ LOGGER.error("ansible subprocess nothing result") ++ ++ @staticmethod ++ def ansible_sync_domain_config_content(host_list: list, file_path_infos: list): ++ # 初始化参数和响应 ++ now_time = str(int(time.time())) ++ host_ip_sync_result = {} ++ BatchSyncConfig.generate_config(host_list, host_ip_sync_result, now_time) ++ ++ ansible_forks = len(host_list) ++ # 配置文件中读取并发数量 ++ # 从内存中获取serial_count ++ serial_count = configuration.serial.get("SERIAL_COUNT") ++ # 换种方式 ++ path_infos = {} ++ for file_info in file_path_infos: ++ file_path = file_info.get("file_path") ++ file_content = file_info.get("content") ++ # 写临时文件 ++ src_file_path = "/tmp/" + os.path.basename(file_path) ++ with open(src_file_path, "w", encoding="UTF-8") as f: ++ f.write(file_content) ++ path_infos[src_file_path] = file_path ++ ++ # 调用ansible ++ extra_vars = json.dumps({"serial_count": serial_count, "file_path_infos": path_infos}) ++ try: ++ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml" ++ result = BatchSyncConfig.ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE) ++ except Exception as ex: ++ LOGGER.error("ansible playbook execute error:", ex) ++ return host_ip_sync_result ++ ++ processor_result = result.splitlines() ++ char_to_filter = 'item=' ++ filtered_list = [item for item in processor_result if char_to_filter in item] ++ if not filtered_list: ++ return host_ip_sync_result ++ for line in filtered_list: ++ start_index = line.find("[") + 1 ++ end_index = line.find("]", start_index) ++ ip_port = line[start_index:end_index] ++ sync_results = host_ip_sync_result.get(ip_port) ++ ++ start_index1 = line.find("{") ++ end_index1 = line.find(")", start_index1) ++ path_str = line[start_index1:end_index1] ++ file_path = json.loads(path_str.replace("'", "\"")).get("value") ++ if line.startswith("ok:") or line.startswith("changed:"): ++ signal_file_sync = { ++ "filePath": file_path, ++ "result": "SUCCESS" ++ } ++ else: ++ signal_file_sync = { ++ "filePath": file_path, ++ "result": "FAIL" ++ } ++ sync_results.append(signal_file_sync) ++ # 删除中间文件 ++ try: ++ # 删除/tmp下面以id_dsa结尾的文件 ++ file_pattern = "*id_dsa" ++ tmp_files_to_delete = glob.glob(os.path.join(KEY_FILE_PREFIX, file_pattern)) ++ for tmp_file_path in tmp_files_to_delete: ++ os.remove(tmp_file_path) ++ ++ # 删除/tmp下面临时写的path_infos的key值文件 ++ for path in path_infos.keys(): ++ os.remove(path) ++ ++ # 删除临时的HOST_PATH_FILE的临时inventory文件 ++ os.remove(HOST_FILE) ++ except OSError as ex: ++ LOGGER.error("remove file error: %s", ex) ++ return host_ip_sync_result ++ ++ @staticmethod ++ def generate_config(host_list, host_ip_sync_result, now_time): ++ # 取出host_ip,并传入ansible的hosts中 ++ hosts = { ++ "all": { ++ "children": { ++ "sync": { ++ "hosts": { ++ ++ } ++ } ++ } ++ } ++ } ++ ++ for host in host_list: ++ # 生成临时的密钥key文件用于ansible访问远端主机 ++ key_file_path = KEY_FILE_PREFIX + host['host_ip'] + KEY_FILE_SUFFIX ++ with open(key_file_path, 'w', encoding="UTF-8") as keyfile: ++ os.chmod(key_file_path, 0o600) ++ keyfile.write(host['pkey']) ++ host_ip = host['host_ip'] ++ host_vars = { ++ "ansible_host": host_ip, ++ "ansible_ssh_user": "root", ++ "ansible_ssh_private_key_file": key_file_path, ++ "ansible_ssh_port": host['ssh_port'], ++ "ansible_python_interpreter": "/usr/bin/python3", ++ "host_key_checking": False, ++ "interpreter_python": "auto_legacy_silent", ++ "become": True, ++ "become_method": "sudo", ++ "become_user": "root", ++ "become_ask_pass": False, ++ "ssh_args": "-C -o ControlMaster=auto -o ControlPersist=60s StrictHostKeyChecking=no" ++ } ++ ++ hosts['all']['children']['sync']['hosts'][host_ip + "_" + str(host['ssh_port'])] = host_vars ++ # 初始化结果 ++ host_ip_sync_result[host['host_ip'] + "_" + str(host['ssh_port'])] = list() ++ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml" ++ with open(HOST_FILE, 'w') as outfile: ++ yaml.dump(hosts, outfile, default_flow_style=False) ++ ++ @staticmethod ++ def ini2json(ini_path): ++ json_data = {} ++ cfg = RawConfigParser() ++ cfg.read(ini_path) ++ for s in cfg.sections(): ++ json_data[s] = dict(cfg.items(s)) ++ return json_data ++ ++ @BaseResponse.handle(schema=BatchSyncConfigSchema, proxy=HostProxy, token=True) ++ def put(self, callback: HostProxy, **params): ++ # 初始化响应 ++ file_path_infos = params.get('file_path_infos') ++ host_ids = params.get('host_ids') ++ sync_result = list() ++ # Query host address from database ++ if not callback.connect(): ++ return self.response(code=state.DATABASE_CONNECT_ERROR, data=sync_result) ++ ++ # 校验token ++ status, host_list = callback.get_host_info( ++ # 校验token 拿到用户 ++ {"username": params.get("username"), "host_list": host_ids}, True) ++ if status != state.SUCCEED: ++ return self.response(code=status, data=sync_result) ++ ++ # 将ip和id对应起来 ++ host_id_ip_dict = dict() ++ if host_list: ++ for host in host_list: ++ key = host['host_ip'] + "_" + str(host['ssh_port']) ++ host_id_ip_dict[key] = host['host_id'] ++ ++ host_ip_sync_result = self.ansible_sync_domain_config_content(host_list, file_path_infos) ++ ++ if not host_ip_sync_result: ++ return self.response(code=state.EXECUTE_COMMAND_ERROR, data=sync_result) ++ # 处理成id对应结果 ++ for key, value in host_ip_sync_result.items(): ++ host_id = host_id_ip_dict.get(key) ++ single_result = { ++ "host_id": host_id, ++ "syncResult": value ++ } ++ sync_result.append(single_result) ++ return self.response(code=state.SUCCEED, data=sync_result) +diff --git a/zeus/conftrace_manage/__init__.py b/zeus/conftrace_manage/__init__.py +new file mode 100644 +index 0000000..0470fef +--- /dev/null ++++ b/zeus/conftrace_manage/__init__.py +@@ -0,0 +1,18 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: __init__.py.py ++@Time: 2024/4/19 9:21 ++@Author: JiaoSiMao ++Description: ++""" +diff --git a/zeus/conftrace_manage/view.py b/zeus/conftrace_manage/view.py +new file mode 100644 +index 0000000..a1faffc +--- /dev/null ++++ b/zeus/conftrace_manage/view.py +@@ -0,0 +1,279 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++Time: ++Author: ++Description: Restful APIs for conf trace ++""" ++import glob ++import json ++import os ++import queue ++import subprocess ++import threading ++import time ++ ++import yaml ++from vulcanus import LOGGER ++from vulcanus.restful.resp import state ++from vulcanus.restful.resp.state import SUCCEED, SERVER_ERROR ++from vulcanus.restful.response import BaseResponse ++ ++from zeus.conf import configuration ++from zeus.conf.constant import KEY_FILE_PREFIX, KEY_FILE_SUFFIX, HOST_PATH_FILE, CONF_TRACE_LOG_PATH, \ ++ PARENT_DIRECTORY, CONF_TRACE_YML ++from zeus.database.proxy.conf_trace import ConfTraceProxy ++from zeus.database.proxy.host import HostProxy ++from zeus.function.verify.conf_trace import ConfTraceMgmtSchema, ConfTraceDataSchema, ConfTraceQuerySchema, \ ++ ConfTraceDataDeleteSchema ++ ++ ++class ConfTraceMgmt(BaseResponse): ++ """ ++ Interface for register user. ++ Restful API: post ++ """ ++ ++ @staticmethod ++ def parse_result(action, result, host_ip_trace_result, HOST_FILE): ++ code_num = SUCCEED ++ code_string = f"{action} ragdoll-filetrace succeed" ++ processor_result = result.splitlines() ++ char_to_filter = 'unreachable=' ++ filtered_list = [item for item in processor_result if char_to_filter in item] ++ if not filtered_list: ++ code_num = SERVER_ERROR ++ code_string = f"{action} ragdoll-filetrace error, no result" ++ for line in filtered_list: ++ result_start_index = line.find(":") ++ ip_port = line[0:result_start_index] ++ trace_result = host_ip_trace_result.get(ip_port.strip()) ++ print(trace_result) ++ result_str = line[result_start_index:] ++ if "unreachable=0" in result_str and "failed=0" in result_str: ++ host_ip_trace_result[ip_port.strip()] = True ++ else: ++ host_ip_trace_result[ip_port.strip()] = False ++ ++ # 删除中间文件 ++ try: ++ # 删除/tmp下面以id_dsa结尾的文件 ++ dsa_file_pattern = "*id_dsa" ++ dsa_tmp_files_to_delete = glob.glob(os.path.join(KEY_FILE_PREFIX, dsa_file_pattern)) ++ for dsa_tmp_file_path in dsa_tmp_files_to_delete: ++ os.remove(dsa_tmp_file_path) ++ ++ # 删除临时的HOST_PATH_FILE的临时inventory文件 ++ os.remove(HOST_FILE) ++ except OSError as ex: ++ LOGGER.error("remove file error: %s", ex) ++ return code_num, code_string ++ ++ @staticmethod ++ def run_subprocess(cmd, result_queue): ++ try: ++ completed_process = subprocess.run(cmd, cwd=PARENT_DIRECTORY, shell=True, capture_output=True, text=True) ++ result_queue.put(completed_process) ++ except subprocess.CalledProcessError as ex: ++ result_queue.put(ex) ++ ++ @staticmethod ++ def ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE): ++ if not os.path.exists(CONF_TRACE_LOG_PATH): ++ os.makedirs(CONF_TRACE_LOG_PATH) ++ ++ CONF_TRACE_LOG = CONF_TRACE_LOG_PATH + "conf_trace_" + now_time + ".log" ++ ++ cmd = f"ansible-playbook -f {ansible_forks} -e '{extra_vars}' " \ ++ f"-i {HOST_FILE} {CONF_TRACE_YML} |tee {CONF_TRACE_LOG} " ++ result_queue = queue.Queue() ++ thread = threading.Thread(target=ConfTraceMgmt.run_subprocess, args=(cmd, result_queue)) ++ thread.start() ++ ++ thread.join() ++ try: ++ completed_process = result_queue.get(block=False) ++ if isinstance(completed_process, subprocess.CalledProcessError): ++ LOGGER.error("ansible subprocess error:", completed_process) ++ else: ++ if completed_process.returncode == 0: ++ return completed_process.stdout ++ else: ++ LOGGER.error("ansible subprocess error:", completed_process) ++ except queue.Empty: ++ LOGGER.error("ansible subprocess nothing result") ++ ++ @staticmethod ++ def generate_config(host_list, now_time, conf_files, host_ip_trace_result, domain_name): ++ # 取出host_ip,并传入ansible的hosts中 ++ hosts = { ++ "all": { ++ "children": { ++ "sync": { ++ "hosts": { ++ ++ } ++ } ++ } ++ } ++ } ++ ++ for host in host_list: ++ # 生成临时的密钥key文件用于ansible访问远端主机 ++ key_file_path = KEY_FILE_PREFIX + host['host_ip'] + KEY_FILE_SUFFIX ++ with open(key_file_path, 'w', encoding="UTF-8") as keyfile: ++ os.chmod(key_file_path, 0o600) ++ keyfile.write(host['pkey']) ++ host_ip = host['host_ip'] ++ host_vars = { ++ "ansible_host": host_ip, ++ "ansible_ssh_user": "root", ++ "ansible_ssh_private_key_file": key_file_path, ++ "ansible_ssh_port": host['ssh_port'], ++ "ansible_python_interpreter": "/usr/bin/python3", ++ "host_key_checking": False, ++ "interpreter_python": "auto_legacy_silent", ++ "become": True, ++ "become_method": "sudo", ++ "become_user": "root", ++ "become_ask_pass": False, ++ "ssh_args": "-C -o ControlMaster=auto -o ControlPersist=60s StrictHostKeyChecking=no", ++ "host_id": host['host_id'] ++ } ++ ++ hosts['all']['children']['sync']['hosts'][host_ip + "_" + str(host['ssh_port'])] = host_vars ++ # 初始化结果 ++ host_ip_trace_result[host['host_ip'] + "_" + str(host['ssh_port'])] = True ++ ++ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml" ++ with open(HOST_FILE, 'w') as outfile: ++ yaml.dump(hosts, outfile, default_flow_style=False) ++ ++ @staticmethod ++ def ansible_conf_trace_mgmt(host_list: list, action: str, conf_files: list, domain_name: str): ++ now_time = str(int(time.time())) ++ host_ip_trace_result = {} ++ ConfTraceMgmt.generate_config(host_list, now_time, conf_files, host_ip_trace_result, domain_name) ++ ansible_forks = len(host_list) ++ # 配置文件中读取并发数量 ++ # 从内存中获取serial_count ++ # serial_count = configuration.serial.get("SERIAL_COUNT") ++ # 组装ansible执行的extra参数 ++ ip = configuration.zeus.get('IP') ++ port = configuration.zeus.get("PORT") ++ if conf_files: ++ conf_list_str = ",".join(conf_files) ++ else: ++ conf_list_str = "" ++ extra_vars = f"action={action} ip={ip} port={port} conf_list_str={conf_list_str} " \ ++ f"domain_name={domain_name} " ++ # 调用ansible ++ try: ++ HOST_FILE = HOST_PATH_FILE + "hosts_" + now_time + ".yml" ++ result = ConfTraceMgmt.ansible_handler(now_time, ansible_forks, extra_vars, HOST_FILE) ++ except Exception as ex: ++ LOGGER.error("ansible playbook execute error:", ex) ++ conf_trace_mgmt_result = "ragdoll-filetrace ansible playbook execute error" ++ return SERVER_ERROR, conf_trace_mgmt_result, host_ip_trace_result ++ # 根据action解析每个result ++ code_num, code_string = ConfTraceMgmt.parse_result(action, result, host_ip_trace_result, HOST_FILE) ++ return code_num, code_string, host_ip_trace_result ++ ++ @BaseResponse.handle(schema=ConfTraceMgmtSchema, proxy=HostProxy, token=True) ++ def put(self, callback: HostProxy, **params): ++ host_ids = params.get("host_ids") ++ action = params.get("action") ++ conf_files = params.get("conf_files") ++ domain_name = params.get("domain_name") ++ ++ # 根据id获取host信息 ++ # Query host address from database ++ if not callback.connect(): ++ return self.response(code=state.DATABASE_CONNECT_ERROR, message="database connect error") ++ ++ # 校验token ++ status, host_list = callback.get_host_info( ++ # 校验token 拿到用户 ++ {"username": params.get("username"), "host_list": host_ids}, True) ++ if status != state.SUCCEED: ++ return self.response(code=status, message="get host info error") ++ ++ # 组装ansible外部数据 ++ code_num, code_string, host_ip_trace_result = self.ansible_conf_trace_mgmt(host_list, action, conf_files, ++ domain_name) ++ return self.response(code=code_num, message=code_string, data=host_ip_trace_result) ++ ++ ++class ConfTraceData(BaseResponse): ++ @staticmethod ++ def validate_conf_trace_info(params: dict): ++ """ ++ query conf trace info, validate that the host sync status info is valid ++ return host object ++ ++ Args: ++ params (dict): e.g ++ { ++ "domain_name": "aops", ++ "host_id": 1, ++ "conf_name": "/etc/hostname", ++ "info": "" ++ } ++ ++ Returns: ++ tuple: ++ status code, host sync status object ++ """ ++ # 检查host 是否存在 ++ host_proxy = HostProxy() ++ if not host_proxy.connect(): ++ LOGGER.error("Connect to database error") ++ return state.DATABASE_CONNECT_ERROR, {} ++ data = {"host_list": [params.get("host_id")]} ++ code_num, result_list = host_proxy.get_host_info_by_host_id(data) ++ if code_num != SUCCEED: ++ LOGGER.error("query host info error") ++ return state.DATABASE_QUERY_ERROR, {} ++ if len(result_list) == 0: ++ return state.NO_DATA, [] ++ return code_num, result_list ++ ++ @BaseResponse.handle(schema=ConfTraceDataSchema, proxy=ConfTraceProxy, token=False) ++ def post(self, callback: ConfTraceProxy, **params): ++ # 校验hostId是否存在 ++ code_num, result_list = self.validate_conf_trace_info(params) ++ if code_num != SUCCEED or len(result_list) == 0: ++ return self.response(code=SERVER_ERROR, message="request param host id does not exist") ++ ++ status_code = callback.add_conf_trace_info(params) ++ if status_code != state.SUCCEED: ++ return self.response(code=SERVER_ERROR, message="Failed to upload data, service error") ++ return self.response(code=SUCCEED, message="Succeed to upload conf trace info data") ++ ++ ++class ConfTraceQuery(BaseResponse): ++ @BaseResponse.handle(schema=ConfTraceQuerySchema, proxy=ConfTraceProxy, token=True) ++ def post(self, callback: ConfTraceProxy, **params): ++ status_code, result = callback.query_conf_trace_info(params) ++ if status_code != SUCCEED: ++ return self.response(code=SERVER_ERROR, message="Failed to query data, service error") ++ return self.response(code=SUCCEED, message="Succeed to query conf trace info data", data=result) ++ ++ ++class ConfTraceDataDelete(BaseResponse): ++ @BaseResponse.handle(schema=ConfTraceDataDeleteSchema, proxy=ConfTraceProxy, token=True) ++ def post(self, callback: ConfTraceProxy, **params): ++ status_code = callback.delete_conf_trace_info(params) ++ if status_code != state.SUCCEED: ++ return self.response(code=SERVER_ERROR, message="Failed to delete data, service error") ++ return self.response(code=SUCCEED, message="Succeed to delete conf trace info data") +diff --git a/zeus/cron/__init__.py b/zeus/cron/__init__.py +new file mode 100644 +index 0000000..377a23d +--- /dev/null ++++ b/zeus/cron/__init__.py +@@ -0,0 +1,21 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (c) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: __init__.py.py ++@Time: 2024/3/5 16:55 ++@Author: JiaoSiMao ++Description: ++""" ++from zeus.cron.update_config_sync_status_task import UpdateConfigSyncStatusTask ++ ++task_meta = {"update_config_sync_status_task": UpdateConfigSyncStatusTask} +diff --git a/zeus/cron/update_config_sync_status_task.py b/zeus/cron/update_config_sync_status_task.py +new file mode 100644 +index 0000000..c836291 +--- /dev/null ++++ b/zeus/cron/update_config_sync_status_task.py +@@ -0,0 +1,267 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (c) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: update_config_sync_status_task.py ++@Time: 2024/3/5 16:56 ++@Author: JiaoSiMao ++Description: ++""" ++import json ++ ++import requests ++from vulcanus.log.log import LOGGER ++from vulcanus.timed import TimedTask ++from vulcanus.database.helper import make_mysql_engine_url, create_database_engine ++from vulcanus.database.proxy import MysqlProxy ++from vulcanus.restful.resp import state ++from vulcanus.restful.resp.state import SUCCEED ++ ++from zeus.conf import configuration ++from zeus.conf.constant import DIRECTORY_FILE_PATH_LIST ++from zeus.config_manager.view import ObjectFileConfig, CollectConfig ++from zeus.database.proxy.host import HostProxy ++from zeus.database.proxy.host_sync_status import HostSyncProxy ++from zeus.utils.conf_tools import ConfTools ++ ++ ++class UpdateConfigSyncStatusTask(TimedTask): ++ @staticmethod ++ def get_domain_files(domain_paths: dict, expected_confs_resp: list): ++ # 获取domain中要获取文件内容的文件路径 ++ for domain_confs in expected_confs_resp: ++ domain_name = domain_confs.get("domainName") ++ conf_base_infos = domain_confs.get("confBaseInfos") ++ file_list = [] ++ if conf_base_infos: ++ for conf_info in conf_base_infos: ++ file_list.append(conf_info.get("filePath")) ++ domain_paths[domain_name] = file_list ++ ++ @staticmethod ++ def deal_pam_d_config(host_info, directory_path): ++ # 先获取/etc/pam.d下有哪些文件 ++ status, content = ObjectFileConfig.object_file_config_content( ++ host_info, directory_path ++ ) ++ if status == state.SUCCEED: ++ content_dict = json.loads(content) ++ directory_paths = content_dict.get("resp") ++ return directory_paths ++ return [] ++ ++ @staticmethod ++ def deal_host_file_content(domain_result, host_file_content_result): ++ host_id = host_file_content_result.get("host_id") ++ infos = host_file_content_result.get("infos") ++ file_content_list = [] ++ pam_d_file_list = [] ++ if infos: ++ for info in infos: ++ pam_d_file = {} ++ info_path = str(info.get("path")) ++ for file_path in DIRECTORY_FILE_PATH_LIST: ++ if info_path.find(file_path) == -1: ++ signal_file_content = { ++ "filePath": info.get("path"), ++ "contents": info.get("content"), ++ } ++ file_content_list.append(signal_file_content) ++ else: ++ pam_d_file[info_path] = info.get("content") ++ pam_d_file_list.append(pam_d_file) ++ if pam_d_file_list: ++ directory_file_dict = {} ++ for file_path in DIRECTORY_FILE_PATH_LIST: ++ directory_file_dict[file_path] = {} ++ for path, content_list in directory_file_dict.items(): ++ for pam_d_file in pam_d_file_list: ++ pam_d_file_path = str(list(pam_d_file.keys())[0]) ++ if path in pam_d_file_path: ++ content_list[pam_d_file_path] = pam_d_file.get(pam_d_file_path) ++ for key, value in directory_file_dict.items(): ++ pam_d_file_content = {"filePath": key, "contents": json.dumps(value)} ++ file_content_list.append(pam_d_file_content) ++ if file_content_list: ++ domain_result[str(host_id)] = file_content_list ++ ++ def collect_file_infos(self, param, host_infos_result): ++ # 组装host_id和要获取内容的文件列表 一一对应 ++ domain_result = {} ++ host_id_with_config_file = {} ++ for host in param.get("infos"): ++ host_id_with_config_file[host.get("host_id")] = host.get("config_list") ++ ++ for host_id, file_list in host_id_with_config_file.items(): ++ host_info = host_infos_result.get(host_id) ++ # 处理/etc/pam.d ++ for file_path in DIRECTORY_FILE_PATH_LIST: ++ if file_path in file_list: ++ file_list.remove(file_path) ++ object_file_paths = self.deal_pam_d_config(host_info, file_path) ++ if object_file_paths: ++ file_list.extend(object_file_paths) ++ host_file_content_result = CollectConfig.get_file_content( ++ host_info, file_list ++ ) ++ # 处理结果 ++ self.deal_host_file_content(domain_result, host_file_content_result) ++ return domain_result ++ ++ @staticmethod ++ def make_database_engine(): ++ engine_url = make_mysql_engine_url(configuration) ++ MysqlProxy.engine = create_database_engine( ++ engine_url, ++ configuration.mysql.get("POOL_SIZE"), ++ configuration.mysql.get("POOL_RECYCLE"), ++ ) ++ ++ @staticmethod ++ def get_domain_host_ids(domain_list_resp, host_sync_proxy): ++ domain_host_id_dict = {} ++ for domain in domain_list_resp: ++ domain_name = domain["domainName"] ++ status, host_sync_infos = host_sync_proxy.get_domain_host_sync_status( ++ domain_name ++ ) ++ if status != SUCCEED or not host_sync_infos: ++ continue ++ host_ids = [host_sync["host_id"] for host_sync in host_sync_infos] ++ domain_host_id_dict[domain_name] = host_ids ++ return domain_host_id_dict ++ ++ @staticmethod ++ def get_all_host_infos(): ++ host_infos_result = {} ++ proxy = HostProxy() ++ proxy.connect() ++ status, host_list = proxy.get_host_info( ++ {"username": "admin", "host_list": list()}, True ++ ) ++ if status != state.SUCCEED: ++ return {} ++ for host in host_list: ++ host_infos_result[host["host_id"]] = host ++ return host_infos_result ++ ++ @staticmethod ++ def compare_conf(expected_confs_resp, domain_result): ++ headers = {"Content-Type": "application/json"} ++ # 获取所有的domain ++ domain_conf_diff_url = ConfTools.load_url_by_conf().get("domain_conf_diff_url") ++ # 调用ragdoll接口比对 ++ try: ++ request_data = { ++ "expectedConfsResp": expected_confs_resp, ++ "domainResult": domain_result, ++ } ++ domain_diff_response = requests.post( ++ domain_conf_diff_url, data=json.dumps(request_data), headers=headers ++ ) ++ domain_diff_resp = json.loads(domain_diff_response.text) ++ if domain_diff_resp.get("data"): ++ return domain_diff_resp.get("data") ++ return [] ++ except requests.exceptions.RequestException as connect_ex: ++ LOGGER.error(f"Failed to get domain list, an error occurred: {connect_ex}") ++ return [] ++ ++ @staticmethod ++ def update_sync_status_for_db(domain_diff_resp, host_sync_proxy): ++ if domain_diff_resp: ++ status, save_ids = host_sync_proxy.update_domain_host_sync_status( ++ domain_diff_resp ++ ) ++ update_result = sum(save_ids) ++ if status != SUCCEED or update_result == 0: ++ LOGGER.error("failed update host sync status data") ++ if update_result > 0: ++ LOGGER.info( ++ "update %s host sync status basic info succeed", update_result ++ ) ++ else: ++ LOGGER.info("no host sync status data need to update") ++ return ++ ++ def execute(self): ++ headers = {"Content-Type": "application/json"} ++ # 获取所有的domain ++ domain_list_url = ConfTools.load_url_by_conf().get("domain_list_url") ++ try: ++ domain_list_response = requests.post(domain_list_url, data=json.dumps({}), headers=headers) ++ domain_list_resp = json.loads(domain_list_response.text) ++ except requests.exceptions.RequestException as connect_ex: ++ LOGGER.error(f"Failed to get domain list, an error occurred: {connect_ex}") ++ return ++ # 处理响应 ++ if not domain_list_resp.get("data"): ++ LOGGER.error( ++ "Failed to get all domain, please check interface /domain/queryDomain" ++ ) ++ return ++ ++ # 调用ragdoll query_excepted_confs接口获取所有业务域的基线配置内容 ++ domain_list_url = ConfTools.load_url_by_conf().get("expected_confs_url") ++ domain_names = {"domainNames": domain_list_resp.get("data")} ++ try: ++ expected_confs_response = requests.post( ++ domain_list_url, data=json.dumps(domain_names), headers=headers ++ ) ++ expected_confs_resp = json.loads(expected_confs_response.text) ++ except requests.exceptions.RequestException as connect_ex: ++ LOGGER.error( ++ f"Failed to get all domain expected conf list, an error occurred: {connect_ex}" ++ ) ++ return ++ if not expected_confs_resp.get("data"): ++ LOGGER.error( ++ "Failed to get all domain confs, please check interface /confs/queryExpectedConfs" ++ ) ++ return ++ ++ # 方式一 创建数据引擎 ++ self.make_database_engine() ++ # 方式一 根据domain获取所有的id,从host_conf_sync_status表中读取 ++ host_sync_proxy = HostSyncProxy() ++ host_sync_proxy.connect() ++ domain_host_id_dict = self.get_domain_host_ids( ++ domain_list_resp.get("data"), host_sync_proxy ++ ) ++ if not domain_host_id_dict: ++ LOGGER.info("no host sync status data need to update") ++ return ++ # 获取所有admin下面的ip的信息 ++ host_infos_result = self.get_all_host_infos() ++ if not host_infos_result: ++ LOGGER.info("no host sync status data need to update") ++ return ++ ++ # 方式一 组装参数并调用CollectConfig接口get_file_content获取文件真实内容 ++ domain_paths = {} ++ self.get_domain_files(domain_paths, expected_confs_resp.get("data")) ++ ++ domain_result = {} ++ for domain_name, host_id_list in domain_host_id_dict.items(): ++ data = {"infos": []} ++ file_paths = domain_paths.get(domain_name) ++ if file_paths: ++ for host_id in host_id_list: ++ data_info = {"host_id": host_id, "config_list": file_paths} ++ data["infos"].append(data_info) ++ if data["infos"]: ++ result = self.collect_file_infos(data, host_infos_result) ++ domain_result[domain_name] = result ++ # 调用ragdoll接口进行对比 ++ domain_diff_resp = self.compare_conf(expected_confs_resp.get("data"), domain_result) ++ # 根据结果更新数据库 ++ self.update_sync_status_for_db(domain_diff_resp, host_sync_proxy) +diff --git a/zeus/database/__init__.py b/zeus/database/__init__.py +index 2b8a163..2077be7 100644 +--- a/zeus/database/__init__.py ++++ b/zeus/database/__init__.py +@@ -15,7 +15,6 @@ Time: + Author: + Description: + """ +-from flask import g + from sqlalchemy.orm import sessionmaker + from sqlalchemy.orm.scoping import scoped_session + from vulcanus.database.helper import make_mysql_engine_url +diff --git a/zeus/database/proxy/conf_trace.py b/zeus/database/proxy/conf_trace.py +new file mode 100644 +index 0000000..e0e032f +--- /dev/null ++++ b/zeus/database/proxy/conf_trace.py +@@ -0,0 +1,238 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: conf_trace.py ++@Time: 2024/4/23 14:24 ++@Author: JiaoSiMao ++Description: ++""" ++import datetime ++import json ++import math ++import uuid ++ ++import sqlalchemy ++from sqlalchemy import desc, asc, func ++ ++from vulcanus.database.proxy import MysqlProxy ++from vulcanus.log.log import LOGGER ++from vulcanus.restful.resp.state import ( ++ DATABASE_INSERT_ERROR, ++ SUCCEED, DATABASE_QUERY_ERROR, DATABASE_DELETE_ERROR, ++) ++ ++from zeus.database.table import ConfTraceInfo ++ ++ ++class ConfTraceProxy(MysqlProxy): ++ """ ++ Conf trace related table operation ++ """ ++ ++ def add_conf_trace_info(self, data): ++ """ ++ add conf trace info to table ++ ++ Args: ++ data: parameter, e.g. ++ { ++ "domain_name": "aops", ++ "host_id": 1, ++ "conf_name": "/etc/hostname", ++ "info": "" ++ } ++ ++ Returns: ++ int: SUCCEED or DATABASE_INSERT_ERROR ++ """ ++ domain_name = data.get('domain_name') ++ host_id = int(data.get('host_id')) ++ conf_name = data.get('file') ++ info = json.dumps(data) ++ conf_trace_info = ConfTraceInfo(UUID=str(uuid.uuid4()), domain_name=domain_name, host_id=host_id, ++ conf_name=conf_name, info=info, create_time=datetime.datetime.now()) ++ try: ++ ++ self.session.add(conf_trace_info) ++ self.session.commit() ++ LOGGER.info( ++ f"add {conf_trace_info.domain_name} {conf_trace_info.host_id} {conf_trace_info.conf_name} conf trace " ++ f"info succeed") ++ return SUCCEED ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ LOGGER.error( ++ f"add {conf_trace_info.domain_name} {conf_trace_info.host_ip} {conf_trace_info.conf_name} conf trace " ++ f"info fail") ++ self.session.rollback() ++ return DATABASE_INSERT_ERROR ++ ++ def query_conf_trace_info(self, data): ++ """ ++ query conf trace info from table ++ ++ Args: ++ data: parameter, e.g. ++ { ++ "domain_name": "aops", ++ "host_id": 1, ++ "conf_name": "/etc/hostname", ++ } ++ ++ Returns: ++ int: SUCCEED or DATABASE_INSERT_ERROR ++ """ ++ result = {} ++ try: ++ result = self._sort_trace_info_by_column(data) ++ self.session.commit() ++ LOGGER.debug("query conf trace info succeed") ++ return SUCCEED, result ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ LOGGER.error("query conf trace info fail") ++ return DATABASE_QUERY_ERROR, result ++ ++ def delete_conf_trace_info(self, data): ++ """ ++ delete conf trace info from table ++ ++ Args: ++ data: parameter, e.g. ++ { ++ "domain_name": "aops", ++ "host_ids": [1] ++ } ++ ++ Returns: ++ int: SUCCEED or DATABASE_INSERT_ERROR ++ """ ++ domainName = data['domain_name'] ++ host_ids = data['host_ids'] ++ try: ++ # delete matched conf trace info ++ if host_ids: ++ conf_trace_filters = {ConfTraceInfo.host_id.in_(host_ids), ConfTraceInfo.domain_name == domainName} ++ else: ++ conf_trace_filters = {ConfTraceInfo.domain_name == domainName} ++ confTraceInfos = self.session.query(ConfTraceInfo).filter(*conf_trace_filters).all() ++ for confTraceInfo in confTraceInfos: ++ self.session.delete(confTraceInfo) ++ self.session.commit() ++ return SUCCEED ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ LOGGER.error("delete conf trace info fail") ++ self.session.rollback() ++ return DATABASE_DELETE_ERROR ++ ++ @staticmethod ++ def _get_conf_trace_filters(data): ++ """ ++ Generate filters ++ ++ Args: ++ data(dict) ++ ++ Returns: ++ set ++ """ ++ domain_name = data.get('domain_name') ++ host_id = data.get('host_id') ++ conf_name = data.get('conf_name') ++ filters = {ConfTraceInfo.host_id > 0} ++ if domain_name: ++ filters.add(ConfTraceInfo.domain_name == domain_name) ++ if host_id: ++ filters.add(ConfTraceInfo.host_id == host_id) ++ if conf_name: ++ filters.add(ConfTraceInfo.conf_name == conf_name) ++ return filters ++ ++ def _get_conf_trace_count(self, filters): ++ """ ++ Query according to filters ++ ++ Args: ++ filters(set): query filters ++ ++ Returns: ++ int ++ """ ++ total_count = self.session.query(func.count(ConfTraceInfo.UUID)).filter(*filters).scalar() ++ return total_count ++ ++ def _sort_trace_info_by_column(self, data): ++ """ ++ Sort conf trace info by specified column ++ ++ Args: ++ data(dict): sorted condition info ++ ++ Returns: ++ dict ++ """ ++ result = {"total_count": 0, "total_page": 0, "conf_trace_infos": []} ++ sort = data.get('sort') ++ direction = desc if data.get('direction') == 'desc' else asc ++ page = data.get('page') ++ per_page = data.get('per_page') ++ total_page = 1 ++ filters = self._get_conf_trace_filters(data) ++ total_count = self._get_conf_trace_count(filters) ++ if total_count == 0: ++ return result ++ ++ if sort: ++ if page and per_page: ++ total_page = math.ceil(total_count / per_page) ++ conf_trace_infos = ( ++ self.session.query(ConfTraceInfo) ++ .filter(*filters) ++ .order_by(direction(getattr(ConfTraceInfo, sort))) ++ .offset((page - 1) * per_page) ++ .limit(per_page) ++ .all() ++ ) ++ else: ++ conf_trace_infos = self.session.query(ConfTraceInfo).filter(*filters).order_by( ++ direction(getattr(ConfTraceInfo, sort))).all() ++ else: ++ if page and per_page: ++ total_page = math.ceil(total_count / per_page) ++ conf_trace_infos = self.session.query(ConfTraceInfo).filter(*filters).offset( ++ (page - 1) * per_page).limit(per_page).all() ++ else: ++ conf_trace_infos = self.session.query(ConfTraceInfo).filter(*filters).all() ++ ++ LOGGER.error(f"conf_trace_infos is {conf_trace_infos}") ++ for conf_trace_info in conf_trace_infos: ++ info_dict = json.loads(conf_trace_info.info) ++ info_str = f"进程:{info_dict.get('cmd')} 修改了文件:{info_dict.get('file')}" ++ ptrace_data = "=> ".join(f"{item['cmd']}:{item['pid']}" for item in info_dict.get('ptrace')) ++ ptrace = f"{info_dict.get('cmd')} => {ptrace_data}" ++ conf_trace_info = { ++ "UUID": conf_trace_info.UUID, ++ "domain_name": conf_trace_info.domain_name, ++ "host_id": conf_trace_info.host_id, ++ "conf_name": conf_trace_info.conf_name, ++ "info": info_str, ++ "create_time": str(conf_trace_info.create_time), ++ "ptrace": ptrace ++ } ++ result["conf_trace_infos"].append(conf_trace_info) ++ ++ result["total_page"] = total_page ++ result["total_count"] = total_count ++ LOGGER.error(f"result is {result}") ++ return result +diff --git a/zeus/database/proxy/host.py b/zeus/database/proxy/host.py +index 2e4a6ce..1dad1fd 100644 +--- a/zeus/database/proxy/host.py ++++ b/zeus/database/proxy/host.py +@@ -278,6 +278,7 @@ class HostProxy(MysqlProxy): + "management": host.management, + "scene": host.scene, + "os_version": host.os_version, ++ "status": host.status, + "ssh_port": host.ssh_port, + } + result['host_infos'].append(host_info) +@@ -346,6 +347,64 @@ class HostProxy(MysqlProxy): + LOGGER.error("query host %s basic info fail", host_list) + return DATABASE_QUERY_ERROR, result + ++ def get_host_info_by_host_id(self, data): ++ """ ++ Get host basic info according to host id from table ++ ++ Args: ++ data(dict): parameter, e.g. ++ { ++ "username": "admin" ++ "host_list": ["id1", "id2"] ++ } ++ is_collect_file (bool) ++ ++ Returns: ++ int: status code ++ dict: query result ++ """ ++ host_list = data.get('host_list') ++ result = [] ++ query_fields = [ ++ Host.host_id, ++ Host.host_name, ++ Host.host_ip, ++ Host.os_version, ++ Host.ssh_port, ++ Host.host_group_name, ++ Host.management, ++ Host.status, ++ Host.scene, ++ Host.pkey, ++ Host.ssh_user ++ ] ++ filters = set() ++ if host_list: ++ filters.add(Host.host_id.in_(host_list)) ++ try: ++ hosts = self.session.query(*query_fields).filter(*filters).all() ++ for host in hosts: ++ host_info = { ++ "host_id": host.host_id, ++ "host_group_name": host.host_group_name, ++ "host_name": host.host_name, ++ "host_ip": host.host_ip, ++ "management": host.management, ++ "status": host.status, ++ "scene": host.scene, ++ "os_version": host.os_version, ++ "ssh_port": host.ssh_port, ++ "pkey": host.pkey, ++ "ssh_user": host.ssh_user, ++ } ++ result.append(host_info) ++ LOGGER.debug("query host %s basic info succeed", host_list) ++ return SUCCEED, result ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ LOGGER.error("query host %s basic info fail", host_list) ++ return DATABASE_QUERY_ERROR, result ++ + def get_host_ssh_info(self, data): + """ + Get host ssh info according to host id from table +diff --git a/zeus/database/proxy/host_sync_status.py b/zeus/database/proxy/host_sync_status.py +new file mode 100644 +index 0000000..7f4e165 +--- /dev/null ++++ b/zeus/database/proxy/host_sync_status.py +@@ -0,0 +1,208 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++Time: ++Author: ++Description: Host table operation ++""" ++from typing import Tuple ++ ++import sqlalchemy ++from vulcanus.database.proxy import MysqlProxy ++from vulcanus.log.log import LOGGER ++from vulcanus.restful.resp.state import ( ++ DATABASE_DELETE_ERROR, ++ DATABASE_INSERT_ERROR, ++ SUCCEED, DATABASE_QUERY_ERROR, ++) ++from zeus.database.table import HostSyncStatus ++ ++ ++class HostSyncProxy(MysqlProxy): ++ """ ++ Host related table operation ++ """ ++ ++ def add_host_sync_status(self, data) -> int: ++ """ ++ add host to table ++ ++ Args: ++ host_sync_status: parameter, e.g. ++ { ++ "host_id": 1, ++ "host_ip": "192.168.1.1", ++ "domain_name": "aops", ++ "sync_status": 0 ++ } ++ ++ Returns: ++ int: SUCCEED or DATABASE_INSERT_ERROR ++ """ ++ host_id = data.get('host_id') ++ host_ip = str(data.get('host_ip')) ++ domain_name = data.get('domain_name') ++ sync_status = data.get('sync_status') ++ host_sync_status = HostSyncStatus(host_id=host_id, host_ip=host_ip, domain_name=domain_name, ++ sync_status=sync_status) ++ try: ++ ++ self.session.add(host_sync_status) ++ self.session.commit() ++ LOGGER.info(f"add {host_sync_status.domain_name} {host_sync_status.host_ip} host sync status succeed") ++ return SUCCEED ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ LOGGER.error(f"add {host_sync_status.domain_name} {host_sync_status.host_ip} host sync status fail") ++ self.session.rollback() ++ return DATABASE_INSERT_ERROR ++ ++ def add_host_sync_status_batch(self, host_sync_list: list) -> str: ++ """ ++ Add host to the table in batches ++ ++ Args: ++ host_sync_list(list): list of host sync status object ++ ++ Returns: ++ str: SUCCEED or DATABASE_INSERT_ERROR ++ """ ++ try: ++ self.session.bulk_save_objects(host_sync_list) ++ self.session.commit() ++ LOGGER.info(f"add host {[host_sync_status.host_ip for host_sync_status in host_sync_list]} succeed") ++ return SUCCEED ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ self.session.rollback() ++ return DATABASE_INSERT_ERROR ++ ++ def delete_host_sync_status(self, data): ++ """ ++ Delete host from table ++ ++ Args: ++ data(dict): parameter, e.g. ++ { ++ "host_id": 1, ++ "domain_name": "aops", ++ } ++ ++ Returns: ++ int ++ """ ++ host_id = data['host_id'] ++ domain_name = data['domain_name'] ++ try: ++ # query matched host sync status ++ hostSyncStatus = self.session.query(HostSyncStatus). \ ++ filter(HostSyncStatus.host_id == host_id). \ ++ filter(HostSyncStatus.domain_name == domain_name). \ ++ all() ++ for host_sync in hostSyncStatus: ++ self.session.delete(host_sync) ++ self.session.commit() ++ LOGGER.info(f"delete {domain_name} {host_id} host sync status succeed") ++ return SUCCEED ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ LOGGER.error("delete host sync status fail") ++ self.session.rollback() ++ return DATABASE_DELETE_ERROR ++ ++ def delete_all_host_sync_status(self, data): ++ """ ++ Delete host from table ++ ++ Args: ++ data(dict): parameter, e.g. ++ { ++ "host_ids": [1], ++ "domain_name": "aops", ++ } ++ ++ Returns: ++ int ++ """ ++ host_ids = data['host_ids'] ++ domain_name = data['domain_name'] ++ try: ++ # query matched host sync status ++ if host_ids: ++ host_conf_sync_filters = {HostSyncStatus.host_id.in_(host_ids), ++ HostSyncStatus.domain_name == domain_name} ++ else: ++ host_conf_sync_filters = {HostSyncStatus.domain_name == domain_name} ++ hostSyncStatus = self.session.query(HostSyncStatus). \ ++ filter(*host_conf_sync_filters). \ ++ all() ++ for host_sync in hostSyncStatus: ++ self.session.delete(host_sync) ++ self.session.commit() ++ LOGGER.info(f"delete {domain_name} {host_ids} host sync status succeed") ++ return SUCCEED ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ LOGGER.error("delete host sync status fail") ++ self.session.rollback() ++ return DATABASE_DELETE_ERROR ++ ++ def get_host_sync_status(self, data) -> Tuple[int, dict]: ++ host_id = data['host_id'] ++ domain_name = data['domain_name'] ++ try: ++ host_sync_status = self.session.query(HostSyncStatus). \ ++ filter(HostSyncStatus.host_id == host_id). \ ++ filter(HostSyncStatus.domain_name == domain_name).one_or_none() ++ return SUCCEED, host_sync_status ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ return DATABASE_QUERY_ERROR, {} ++ ++ def get_domain_host_sync_status(self, domain_name: str): ++ try: ++ host_sync_status = self.session.query(HostSyncStatus). \ ++ filter(HostSyncStatus.domain_name == domain_name).all() ++ result = [] ++ for host_sync in host_sync_status: ++ single_host_sync_status = { ++ "host_id": host_sync.host_id, ++ "host_ip": host_sync.host_ip, ++ "domain_name": host_sync.domain_name, ++ "sync_status": host_sync.sync_status ++ } ++ result.append(single_host_sync_status) ++ self.session.commit() ++ LOGGER.debug("query host sync status %s basic info succeed", result) ++ return SUCCEED, result ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ return DATABASE_QUERY_ERROR, [] ++ ++ def update_domain_host_sync_status(self, domain_diff_resp: list): ++ try: ++ saved_ids = [] ++ for domain_diff in domain_diff_resp: ++ update_count = self.session.query(HostSyncStatus).filter( ++ HostSyncStatus.host_id == domain_diff.get("host_id")). \ ++ filter(HostSyncStatus.domain_name == domain_diff.get("domain_name")).update(domain_diff) ++ saved_ids.append(update_count) ++ self.session.commit() ++ LOGGER.debug("update host sync status { %s, %s }basic info succeed", domain_diff.get("host_id"), ++ domain_diff.get("domain_name")) ++ if saved_ids: ++ return SUCCEED, saved_ids ++ return DATABASE_QUERY_ERROR, [] ++ except sqlalchemy.exc.SQLAlchemyError as error: ++ LOGGER.error(error) ++ return DATABASE_QUERY_ERROR, [] +diff --git a/zeus/database/table.py b/zeus/database/table.py +index 9cf604b..e9c20ec 100644 +--- a/zeus/database/table.py ++++ b/zeus/database/table.py +@@ -15,7 +15,9 @@ Time: + Author: + Description: mysql tables + """ +-from sqlalchemy import Column, ForeignKey ++import datetime ++ ++from sqlalchemy import Column, ForeignKey, DateTime, Text + from sqlalchemy.ext.declarative import declarative_base + from sqlalchemy.orm import relationship + from sqlalchemy.sql.sqltypes import Boolean, Integer, String +@@ -132,6 +134,33 @@ class Auth(Base, MyBase): + username = Column(String(40), ForeignKey('user.username')) + + ++class HostSyncStatus(Base, MyBase): ++ """ ++ HostSyncStatus table ++ """ ++ ++ __tablename__ = "host_conf_sync_status" ++ ++ host_id = Column(Integer, primary_key=True) ++ host_ip = Column(String(16), nullable=False) ++ domain_name = Column(String(16), primary_key=True) ++ sync_status = Column(Integer, default=0) ++ ++ ++class ConfTraceInfo(Base, MyBase): ++ """ ++ ConfTraceInfo table ++ """ ++ __tablename__ = "conf_trace_info" ++ ++ UUID = Column(String(36), primary_key=True) ++ domain_name = Column(String(16)) ++ host_id = Column(Integer) ++ conf_name = Column(String(100)) ++ info = Column(Text) ++ create_time = Column(DateTime, default=datetime.datetime) ++ ++ + def create_utils_tables(base, engine): + """ + Create basic database tables, e.g. user, host, hostgroup +@@ -142,6 +171,6 @@ def create_utils_tables(base, engine): + engine (instance): _engine.Engine instance + """ + # pay attention, the sequence of list is important. Base table need to be listed first. +- tables = [User, HostGroup, Host, Auth] ++ tables = [User, HostGroup, Host, Auth, HostSyncStatus, ConfTraceInfo] + tables_objects = [base.metadata.tables[table.__tablename__] for table in tables] + create_tables(base, engine, tables=tables_objects) +diff --git a/zeus/function/verify/conf_trace.py b/zeus/function/verify/conf_trace.py +new file mode 100644 +index 0000000..932c369 +--- /dev/null ++++ b/zeus/function/verify/conf_trace.py +@@ -0,0 +1,70 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: conf_trace.py ++@Time: 2024/4/19 10:23 ++@Author: JiaoSiMao ++Description: ++""" ++from marshmallow import Schema, fields, validate ++ ++ ++class ConfTraceMgmtSchema(Schema): ++ """ ++ validators for parameter of /conftrace/mgmt ++ """ ++ host_ids = fields.List(fields.Integer(), required=True) ++ action = fields.Str(required=True, validate=validate.OneOf(['stop', 'start', 'update'])) ++ conf_files = fields.List(fields.String(), required=False) ++ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ ++ ++class PtraceSchema(Schema): ++ cmd = fields.String(required=True, validate=lambda s: len(s) > 0) ++ pid = fields.Integer(required=True) ++ ++ ++class ConfTraceDataSchema(Schema): ++ """ ++ validators for parameter of /conftrace/data ++ """ ++ host_id = fields.Integer(required=True, validate=lambda s: s > 0) ++ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ file = fields.String(required=True, validate=lambda s: len(s) > 0) ++ syscall = fields.String(required=True) ++ pid = fields.Integer(required=True, validate=lambda s: s > 0) ++ inode = fields.Integer(required=True) ++ cmd = fields.String(required=True, validate=lambda s: len(s) > 0) ++ ptrace = fields.List(fields.Nested(PtraceSchema()), required=True) ++ flag = fields.Integer(required=True) ++ ++ ++class ConfTraceQuerySchema(Schema): ++ """ ++ validators for parameter of /conftrace/query ++ """ ++ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ host_id = fields.Integer(required=True, validate=lambda s: s > 0) ++ conf_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ sort = fields.String(required=False, validate=validate.OneOf(["create_time", "host_id", ""])) ++ direction = fields.String(required=False, validate=validate.OneOf(["desc", "asc"])) ++ page = fields.Integer(required=False, validate=lambda s: s > 0) ++ per_page = fields.Integer(required=False, validate=lambda s: 50 > s > 0) ++ ++ ++class ConfTraceDataDeleteSchema(Schema): ++ """ ++ validators for parameter of /conftrace/delete ++ """ ++ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ host_ids = fields.List(fields.Integer(), required=False) +diff --git a/zeus/function/verify/config.py b/zeus/function/verify/config.py +index 1ef7b97..021a45f 100644 +--- a/zeus/function/verify/config.py ++++ b/zeus/function/verify/config.py +@@ -53,3 +53,18 @@ class ObjectFileConfigSchema(Schema): + """ + host_id = fields.Integer(required=True, validate=lambda s: s > 0) + file_directory = fields.String(required=True, validate=lambda s: len(s) > 0) ++ ++ ++class SingleSyncConfig(Schema): ++ file_path = fields.String(required=True, validate=lambda s: len(s) > 0) ++ content = fields.String(required=True, validate=lambda s: len(s) > 0) ++ ++ ++class BatchSyncConfigSchema(Schema): ++ """ ++ validators for SyncConfigSchema ++ """ ++ host_ids = fields.List(fields.Integer(required=True, validate=lambda s: s > 0), required=True, ++ validate=lambda s: len(s) > 0) ++ file_path_infos = fields.List(fields.Nested(SingleSyncConfig(), required=True), required=True, ++ validate=lambda s: len(s) > 0) +diff --git a/zeus/function/verify/host.py b/zeus/function/verify/host.py +index 7dedfee..48c434c 100644 +--- a/zeus/function/verify/host.py ++++ b/zeus/function/verify/host.py +@@ -149,3 +149,39 @@ class UpdateHostSchema(Schema): + host_group_name = fields.String(required=False, validate=lambda s: 20 >= len(s) > 0) + management = fields.Boolean(required=False, truthy={True}, falsy={False}) + ssh_pkey = fields.String(required=False, validate=lambda s: 4096 >= len(s) >= 0) ++ ++ ++class AddHostSyncStatusSchema(Schema): ++ """ ++ validators for parameter of /manage/host/sync/status/add ++ """ ++ ++ host_id = fields.Integer(required=True, validate=lambda s: s > 0) ++ host_ip = fields.IP(required=True) ++ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ sync_status = fields.Integer(required=True, validate=lambda s: s >= 0) ++ ++ ++class DeleteHostSyncStatusSchema(Schema): ++ """ ++ validators for parameter of /manage/host/sync/status/delete ++ """ ++ ++ host_id = fields.Integer(required=True, validate=lambda s: s > 0) ++ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ ++ ++class DeleteAllHostSyncStatusSchema(Schema): ++ """ ++ validators for parameter of /manage/host/sync/status/delete ++ """ ++ ++ host_ids = fields.List(fields.Integer(), required=False) ++ domain_name = fields.String(required=True, validate=lambda s: len(s) > 0) ++ ++ ++class GetHostSyncStatusSchema(Schema): ++ """ ++ validators for parameter of /manage/host/sync/status/get ++ """ ++ domain_name = fields.String(required=True) +diff --git a/zeus/host_manager/ssh.py b/zeus/host_manager/ssh.py +index a4e7628..fa6d2c0 100644 +--- a/zeus/host_manager/ssh.py ++++ b/zeus/host_manager/ssh.py +@@ -12,14 +12,14 @@ + # ******************************************************************************/ + import socket + from io import StringIO +-from typing import Tuple ++from typing import Tuple, Union + + import paramiko + + from vulcanus.log.log import LOGGER + from vulcanus.restful.resp import state + +-__all__ = ["SSH", "generate_key", "execute_command_and_parse_its_result"] ++__all__ = ["SSH", "InteroperableSSH", "generate_key", "execute_command_and_parse_its_result"] + + from zeus.function.model import ClientConnectArgs + +@@ -57,13 +57,7 @@ class SSH: + """ + + def __init__(self, ip, username, port, password=None, pkey=None): +- self._client_args = { +- 'hostname': ip, +- 'username': username, +- 'port': port, +- "password": password, +- "pkey": pkey +- } ++ self._client_args = {'hostname': ip, 'username': username, 'port': port, "password": password, "pkey": pkey} + self._client = self.client() + + def client(self): +@@ -77,15 +71,15 @@ class SSH: + + def execute_command(self, command: str, timeout: float = None) -> tuple: + """ +- create a ssh client, execute command and parse result ++ create a ssh client, execute command and parse result + +- Args: +- command(str): shell command +- timeout(float): the maximum time to wait for the result of command execution ++ Args: ++ command(str): shell command ++ timeout(float): the maximum time to wait for the result of command execution + +- Returns: +- tuple: +- status, result, error message ++ Returns: ++ tuple: ++ status, result, error message + """ + open_channel = self._client.get_transport().open_session(timeout=timeout) + open_channel.set_combine_stderr(False) +@@ -102,6 +96,94 @@ class SSH: + self._client.close() + + ++class InteroperableSSH: ++ """ ++ An interactive SSH client used to run command in remote node ++ ++ Attributes: ++ ip(str): host ip address, the field is used to record ip information in method paramiko.SSHClient() ++ username(str): remote login user ++ port(int or str): remote login port ++ password(str) ++ pkey(str): RSA-KEY string ++ ++ Notes: ++ In this project, the password field is used when connect to the host for the first ++ time, and the pkey field is used when need to execute the command on the client. ++ """ ++ ++ def __init__( ++ self, ++ ip: str, ++ port: Union[int, str], ++ username: str = 'root', ++ password: str = None, ++ pkey: str = None, ++ channel_timeout=1000, ++ recv_buffer: int = 4096, ++ ) -> None: ++ self.__client = paramiko.SSHClient() ++ self.__client.load_system_host_keys() ++ self.__client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ++ self.__client.connect( ++ hostname=ip, ++ port=int(port), ++ username=username, ++ password=password, ++ pkey=paramiko.RSAKey.from_private_key(StringIO(pkey)), ++ timeout=5, ++ ) ++ ++ # Open an SSH channel and start a shell ++ self.__chan = self.__client.get_transport().open_session() ++ self.__chan.get_pty() ++ self.__chan.invoke_shell() ++ self.__chan.settimeout(channel_timeout) ++ ++ self.buffer = recv_buffer ++ ++ @property ++ def is_active(self): ++ """ ++ Returns the current status of the SSH connection. ++ ++ Returns True if the connection is active, False otherwise. ++ """ ++ return self.__client.get_transport().is_active() ++ ++ def send(self, cmd: str): ++ """ ++ Sends a command to the SSH channel. ++ ++ cmd: The command to send, e.g., 'ls -l'. ++ """ ++ self.__chan.send(cmd) ++ ++ def recv(self) -> str: ++ """ ++ Receives data from the SSH channel and decodes it into a UTF-8 string. ++ ++ Returns: The received data, e.g., 'file1\nfile2\n'. ++ """ ++ ++ return self.__chan.recv(self.buffer).decode("utf-8") ++ ++ def close(self): ++ """ ++ close open_channel ++ """ ++ self.__client.close() ++ ++ def resize(self, cols: int, rows: int): ++ """ ++ Resizes the terminal size of the SSH channel. ++ ++ cols: The number of columns for the terminal. ++ rows: The number of rows for the terminal. ++ """ ++ self.__chan.resize_pty(width=cols, height=rows) ++ ++ + def execute_command_and_parse_its_result(connect_args: ClientConnectArgs, command: str) -> tuple: + """ + create a ssh client, execute command and parse result +@@ -116,14 +198,13 @@ def execute_command_and_parse_its_result(connect_args: ClientConnectArgs, comman + status, result + """ + if not connect_args.pkey: +- return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " \ +- f"{connect_args.host_ip}" ++ return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " f"{connect_args.host_ip}" + try: + client = SSH( + ip=connect_args.host_ip, + username=connect_args.ssh_user, + port=connect_args.ssh_port, +- pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey)) ++ pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey)), + ) + exit_status, stdout, stderr = client.execute_command(command, connect_args.timeout) + except socket.error as error: +@@ -155,14 +236,13 @@ def execute_command_sftp_result(connect_args: ClientConnectArgs, local_path=None + """ + global sftp_client, client + if not connect_args.pkey: +- return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " \ +- f"{connect_args.host_ip}" ++ return state.SSH_AUTHENTICATION_ERROR, f"ssh authentication failed when connect host " f"{connect_args.host_ip}" + try: + client = SSH( + ip=connect_args.host_ip, + username=connect_args.ssh_user, + port=connect_args.ssh_port, +- pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey)) ++ pkey=paramiko.RSAKey.from_private_key(StringIO(connect_args.pkey)), + ) + sftp_client = client.client().open_sftp() + +diff --git a/zeus/host_manager/terminal.py b/zeus/host_manager/terminal.py +new file mode 100644 +index 0000000..e9ce452 +--- /dev/null ++++ b/zeus/host_manager/terminal.py +@@ -0,0 +1,250 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++import sqlalchemy ++from flask import request ++from flask_socketio import SocketIO, Namespace, join_room, leave_room ++from vulcanus.log.log import LOGGER ++from vulcanus.exceptions import DatabaseConnectionFailed ++from zeus.database.proxy.host import HostProxy ++from zeus.host_manager.utils.sockets import XtermRoom ++from zeus.database.table import Host ++from zeus.host_manager.ssh import InteroperableSSH ++ ++ ++socketio = SocketIO( ++ cors_allowed_origins="*", ++ async_mode="gevent", ++) ++ ++# init singleton xterm rooms in global properties ++# to avoid duplicated initializing in different sessions. ++socket_room = XtermRoom(sio=socketio) ++ ++ ++class TerminalNamspace(Namespace): ++ def on_open(self, event: dict): ++ """ ++ Handle Terminal open event ++ ++ Args ++ event: ++ ssh_info: ++ type: object ++ example: { ++ host_id(int): 12 ++ } ++ room: ++ type: string ++ example: abc ++ ++ Returns: None ++ """ ++ room_id = event.get("room") ++ ssh_info = event.get("ssh_info") ++ ++ if not room_id or not ssh_info: ++ self._handle_error( ++ "lack of room or ssh information, \ ++ fail to establish ssh connection" ++ ) ++ ++ host_info = self._get_host_info(ssh_info.get('host_id')) ++ ++ try: ++ joined = socket_room.join( ++ room_id=room_id, ++ namespace=self.namespace, ++ room_sock=InteroperableSSH( ++ ip=host_info.get('host_ip', '0.0.0.0'), ++ port=host_info.get('ssh_port', 22), ++ username=host_info.get('ssh_user', 'root'), ++ pkey=host_info.get('pkey'), ++ ), ++ ) ++ if not joined: ++ raise RuntimeError(f"could not create socket_room[{room_id}]") ++ join_room(room=room_id) ++ except Exception as error: ++ LOGGER.error(error) ++ socket_room.leave(room_id) ++ leave_room(room_id) ++ ++ def on_join(self, event: dict): ++ """ ++ Handle join event ++ ++ Args: ++ event: ++ room: ++ type: string ++ example: abc ++ ++ Returns: None ++ """ ++ room = event.get("room") ++ if not room: ++ LOGGER.error("lack of room token, fail to join in.") ++ ++ try: ++ socket_room.join(room) ++ join_room(room) ++ ++ except Exception as error: ++ LOGGER.error(error) ++ socket_room.leave(room) ++ leave_room(room) ++ ++ def on_stdin(self, event: dict): ++ """ ++ Handle stdin event ++ ++ Args: ++ event: ++ room: ++ type: string ++ .e.g: abc ++ data: ++ type: string ++ .e.g: 'ls -a' ++ Returns: None ++ """ ++ room = event.get("room") ++ data = event.get("data") ++ if not room or not data: ++ return ++ ++ if not socket_room.has(room): ++ self._handle_error(f"socket_room['{room}'] does not exist") ++ leave_room(room=room) ++ ++ sent = socket_room.send(room_id=room, data=data) ++ if not sent: ++ self._handle_error( ++ f"socket_room['{room}'] does not exist, \ ++ could not send data to it." ++ ) ++ ++ def on_leave(self, event: dict): ++ """ ++ Handle leave room event ++ ++ Args: ++ event: ++ room: ++ type: string ++ .e.g: abc ++ ++ Returns: None ++ """ ++ room = event.get("room") ++ if not room or not socket_room.has(room): ++ return ++ ++ socket_room.leave(room_id=room) ++ leave_room(room) ++ ++ def on_resize(self, event: dict): ++ """ ++ Handle resize event ++ ++ Args: ++ event: ++ room: ++ type: string ++ .e.g: abc ++ data: ++ type: dict ++ cols: ++ type: number ++ .e.g: 30 ++ cows: ++ type: number ++ .e.g: 30 ++ ++ Returns: None ++ """ ++ room = event.get("room") ++ data = event.get("data") ++ if not room or not data: ++ return ++ ++ if not socket_room.has(room): ++ self._handle_error(f"socket_room[{room}] does not exist") ++ leave_room(room) ++ ++ resized = socket_room.resize(room, data.get("cols"), data.get("rows")) ++ if not resized: ++ self._handle_error( ++ f"socket_room[{room}] does not exist,\ ++ could not send data to it." ++ ) ++ ++ def _get_host_info(self, host_id: int): ++ """ ++ select host_ip, ssh_port, ssh_user, pkey from host table by host id ++ ++ Args: ++ host_id: int e.g. 3 ++ ++ Returns: host_info ++ dict: e.g. ++ { ++ "host_ip": "127.0.0.1", ++ "ssh_port": 22, ++ "ssh_user": "root", ++ "pkey": "xxxxxxxxxxxxxxxx" ++ } ++ """ ++ query_fields = [ ++ Host.host_ip, ++ Host.ssh_port, ++ Host.pkey, ++ Host.ssh_user, ++ ] ++ host_info = {} ++ ++ try: ++ with HostProxy() as db_proxy: ++ host: Host = db_proxy.session.query(*query_fields).filter(Host.host_id == host_id).first() ++ host_info = { ++ "host_ip": host.host_ip, ++ "ssh_port": host.ssh_port, ++ "pkey": host.pkey, ++ "ssh_user": host.ssh_user, ++ } ++ LOGGER.debug("query host info %s succeed", host_info) ++ return host_info ++ except DatabaseConnectionFailed as connect_error: ++ LOGGER.error('connect database failed, %s', connect_error) ++ return host_info ++ except sqlalchemy.exc.SQLAlchemyError as query_error: ++ LOGGER.error("query host info failed %s", query_error) ++ return host_info ++ ++ def _handle_error(self, err: str): ++ """ ++ unified handling of exceptions ++ """ ++ LOGGER.error( ++ "session[ %s ] connects testbox terminal, failed: { %s }", ++ request.sid, ++ str(err), ++ ) ++ socketio.emit( ++ "error", ++ f"connect failed: {str(err)}", ++ namespace=self.namespace, ++ ) ++ ++ ++socketio.on_namespace(TerminalNamspace("/terminal")) +diff --git a/zeus/host_manager/utils/__init__.py b/zeus/host_manager/utils/__init__.py +new file mode 100644 +index 0000000..cb8be16 +--- /dev/null ++++ b/zeus/host_manager/utils/__init__.py +@@ -0,0 +1,18 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: __init__.py.py ++@Time: 2024/5/29 10:13 ++@Author: JiaoSiMao ++Description: ++""" +diff --git a/zeus/host_manager/utils/sockets.py b/zeus/host_manager/utils/sockets.py +new file mode 100644 +index 0000000..5dfaa75 +--- /dev/null ++++ b/zeus/host_manager/utils/sockets.py +@@ -0,0 +1,198 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++import threading ++from flask_socketio import SocketIO ++from vulcanus.log.log import LOGGER ++from zeus.host_manager.ssh import InteroperableSSH ++ ++ ++class XtermRoom: ++ """ ++ This class represents a collection of Xterm rooms. ++ Each room is a unique SSH connection. ++ """ ++ ++ # The rooms dictionary stores all the active rooms. ++ # Note: This implementation is only suitable for a single-process server. ++ # If you need to deploy a multi-process server, consider using a database, ++ # middleware, or a separate service to manage the rooms. ++ rooms = {} ++ ++ def __init__(self, sio: SocketIO) -> None: ++ """ ++ Initialize the XtermRooms instance. ++ ++ sio: The SocketIO instance used for communication. ++ """ ++ self.sio = sio ++ self.stop_event = threading.Event() ++ ++ def has(self, room_id: str) -> bool: ++ """ ++ Check if a room with the given ID exists and is active. ++ ++ room_id: The ID of the room to check. ++ ++ Returns: True if the room exists and is active, False otherwise. ++ """ ++ ++ room_info = self.rooms.get(room_id) ++ ++ if ( ++ not room_info ++ or not room_info["socket"].is_active ++ or room_info["conns"] < 1 ++ or not room_info["thread"].is_alive() ++ ): ++ self._del(room_id) ++ return False ++ ++ return True ++ ++ def send(self, room_id: str, data: str) -> bool: ++ """ ++ Sends data to the room with the given ID. ++ ++ room_id: The ID of the room to send data to. ++ data: The data to send. ++ ++ Returns: True if the operation is successful, False otherwise. ++ """ ++ ++ if not self.rooms.get(room_id): ++ return False ++ ++ self.rooms[room_id]["socket"].send(data) ++ return True ++ ++ def join(self, room_id: str, namespace: str = None, room_sock=None) -> bool: ++ """ ++ Join a room with the given ID. If the room does not exist, ++ create it. ++ ++ room_id: The ID of the room to join. ++ room_sock: The socket of the room to join. ++ If None, a new socket will be created. ++ ++ Returns: True if the operation is successful, False otherwise. ++ """ ++ if not self.rooms.get(room_id): ++ return self._add(room_id, namespace, room_sock) ++ ++ self.rooms[room_id]["conns"] += 1 ++ self.rooms[room_id]["socket"].send("") ++ return True ++ ++ def leave(self, room_id: str) -> bool: ++ """ ++ Leave a room with the given ID. If the room is empty after leaving, ++ delete it. ++ ++ room_id: The ID of the room to leave. ++ ++ Returns: True if the operation is successful, False otherwise. ++ """ ++ if not self.rooms.get(room_id) or self.rooms[room_id]["conns"] < 1: ++ return False ++ ++ self.rooms[room_id]["conns"] -= 1 ++ if self.rooms[room_id]["conns"] == 0: ++ return self._del(room_id) ++ ++ return True ++ ++ def resize(self, room_id: str, cols: int, rows: int) -> bool: ++ """ ++ Resizes the terminal size of the room with the given ID. ++ ++ room_id: The ID of the room to resize. ++ cols: The number of columns for the terminal. ++ rows: The number of rows for the terminal. ++ ++ Returns: True if the operation is successful, False otherwise. ++ """ ++ if not self.rooms.get(room_id): ++ return False ++ ++ self.rooms[room_id]["socket"].resize(cols, rows) ++ return True ++ ++ def _add(self, room_id: str, namespace: str, room_sock=None) -> bool: ++ """ ++ Add a new room with the given ID and socket. ++ ++ room_id: The ID of the room to add. ++ namespace: The namespace of the room to add. ++ room_sock: The socket of the room to add. ++ ++ Returns: True if the operation is successful, False otherwise. ++ """ ++ ++ if self.rooms.get(room_id): ++ return False ++ ++ if not isinstance(room_sock, InteroperableSSH) or not room_sock.is_active: ++ return False ++ ++ self.rooms[room_id] = { ++ "socket": room_sock, ++ "conns": 1, ++ "thread": threading.Thread(target=self._bg_recv, args=(room_id, namespace)), ++ } ++ ++ self.rooms[room_id]["thread"].start() ++ ++ return True ++ ++ def _del(self, room_id: str) -> bool: ++ """ ++ Delete a room with the given ID. ++ ++ room_id: The ID of the room to delete. ++ ++ Returns: True if the operation is successful, False otherwise. ++ """ ++ room_info = self.rooms.get(room_id) ++ if not room_info: ++ return False ++ ++ try: ++ if room_info["socket"].is_active: ++ room_info["socket"].close() ++ except Exception as error: ++ LOGGER.error("Error while closing socket: %s", error) ++ # self.stop_event.set() # Set the event to signal thread termination ++ # self.rooms[room_id]["thread"].join() # Wait for the thread to finish ++ self.rooms.pop(room_id) ++ return True ++ ++ def _bg_recv(self, room_id: str, namespace: str): ++ """ ++ Continuously receive data from the room's socket in the background and ++ emit it to the room. ++ ++ room_id: The ID of the room to receive data from. ++ """ ++ while True: ++ if len(self.rooms) == 0: ++ break ++ is_active = self.rooms[room_id]["socket"].is_active ++ ++ if not is_active: ++ break ++ self.sio.emit( ++ "message", ++ self.rooms[room_id]["socket"].recv(), ++ namespace=namespace, ++ to=room_id, # Emit the received data to the room ++ ) +diff --git a/zeus/host_manager/view.py b/zeus/host_manager/view.py +index d13868c..f1cc399 100644 +--- a/zeus/host_manager/view.py ++++ b/zeus/host_manager/view.py +@@ -35,7 +35,8 @@ from vulcanus.restful.response import BaseResponse + from vulcanus.restful.serialize.validate import validate + from zeus.conf.constant import CERES_HOST_INFO, HOST_TEMPLATE_FILE_CONTENT, HostStatus + from zeus.database.proxy.host import HostProxy +-from zeus.database.table import Host ++from zeus.database.proxy.host_sync_status import HostSyncProxy ++from zeus.database.table import Host, HostSyncStatus + from zeus.function.model import ClientConnectArgs + from zeus.function.verify.host import ( + AddHostBatchSchema, +@@ -47,7 +48,8 @@ from zeus.function.verify.host import ( + GetHostInfoSchema, + GetHostSchema, + GetHostStatusSchema, +- UpdateHostSchema, ++ UpdateHostSchema, AddHostSyncStatusSchema, DeleteHostSyncStatusSchema, GetHostSyncStatusSchema, ++ DeleteAllHostSyncStatusSchema + ) + from zeus.host_manager.ssh import SSH, execute_command_and_parse_its_result, generate_key + +@@ -965,3 +967,109 @@ class UpdateHost(BaseResponse): + return self.response(code=state.PARAM_ERROR, message="please update password or authentication key.") + + return self.response(callback.update_host_info(params.pop("host_id"), params)) ++ ++ ++class AddHostSyncStatus(BaseResponse): ++ """ ++ Interface for add host sync status. ++ Restful API: POST ++ """ ++ ++ def validate_host_sync_info(self, host_sync_info: dict) -> Tuple[int, dict]: ++ """ ++ query host sync status info, validate that the host sync status info is valid ++ return host object ++ ++ Args: ++ host_sync_info (dict): e.g ++ { ++ "host_id": 1, ++ "host_ip":"192.168.1.1", ++ "domain_name": "aops", ++ "sync_status": 0 ++ } ++ ++ Returns: ++ tuple: ++ status code, host sync status object ++ """ ++ status, host_sync_status = self.proxy.get_host_sync_status(host_sync_info) ++ if status != state.SUCCEED: ++ return status, HostSyncStatus() ++ ++ if host_sync_status is not None: ++ return state.DATA_EXIST, host_sync_status ++ return state.SUCCEED, {} ++ ++ @BaseResponse.handle(schema=AddHostSyncStatusSchema, proxy=HostSyncProxy, token=False) ++ def post(self, callback: HostSyncProxy, **params): ++ """ ++ add host sync status ++ ++ Args: ++ host_id (int): host id ++ host_ip (str): host ip ++ domain_name (str): domain name ++ sync_status (int): sync status ++ ++ Returns: ++ dict: response body ++ """ ++ self.proxy = callback ++ ++ status, host_sync = self.validate_host_sync_info(params) ++ if status != state.SUCCEED: ++ return self.response(code=status) ++ ++ status_code = self.proxy.add_host_sync_status(params) ++ return self.response(code=status_code) ++ ++ ++class DeleteHostSyncStatus(BaseResponse): ++ @BaseResponse.handle(schema=DeleteHostSyncStatusSchema, proxy=HostSyncProxy, token=False) ++ def post(self, callback: HostSyncProxy, **params): ++ """ ++ Add host sync status ++ ++ Args: ++ host_id (int): host id ++ domain_name (str): domain name ++ ++ Returns: ++ dict: response body ++ """ ++ status_code = callback.delete_host_sync_status(params) ++ return self.response(code=status_code) ++ ++ ++class DeleteAllHostSyncStatus(BaseResponse): ++ @BaseResponse.handle(schema=DeleteAllHostSyncStatusSchema, proxy=HostSyncProxy, token=False) ++ def post(self, callback: HostSyncProxy, **params): ++ """ ++ Add host sync status ++ ++ Args: ++ host_id (int): host id ++ domain_name (str): domain name ++ ++ Returns: ++ dict: response body ++ """ ++ status_code = callback.delete_all_host_sync_status(params) ++ return self.response(code=status_code) ++ ++ ++class GetHostSyncStatus(BaseResponse): ++ @BaseResponse.handle(schema=GetHostSyncStatusSchema, proxy=HostSyncProxy, token=False) ++ def post(self, callback: HostSyncProxy, **params): ++ """ ++ get host sync status ++ ++ Args: ++ domain_name (str): domain name ++ Returns: ++ dict: response body ++ """ ++ domain_name = params.get("domain_name") ++ status_code, result = callback.get_domain_host_sync_status(domain_name) ++ return self.response(code=status_code, data=result) +diff --git a/zeus/manage.py b/zeus/manage.py +index 7aab56d..222cd3c 100644 +--- a/zeus/manage.py ++++ b/zeus/manage.py +@@ -15,6 +15,7 @@ Time: + Author: + Description: Manager that start aops-zeus + """ ++ + try: + from gevent import monkey + +@@ -22,12 +23,51 @@ try: + except: + pass + +-from vulcanus import init_application ++from vulcanus import init_application, LOGGER ++from vulcanus.timed import TimedTaskManager + from zeus.conf import configuration + from zeus.url import URLS ++from zeus.conf.constant import TIMED_TASK_CONFIG_PATH ++from zeus.cron import task_meta ++from zeus.host_manager.terminal import socketio ++ ++ ++def _init_timed_task(application): ++ """ ++ Initialize and create a scheduled task ++ ++ Args: ++ application:flask.Application ++ """ ++ timed_task = TimedTaskManager(app=application, config_path=TIMED_TASK_CONFIG_PATH) ++ if not timed_task.timed_config: ++ LOGGER.warning( ++ "If you want to start a scheduled task, please add a timed config." ++ ) ++ return ++ ++ for task_info in timed_task.timed_config.values(): ++ task_type = task_info.get("type") ++ if task_type not in task_meta: ++ continue ++ meta_class = task_meta[task_type] ++ timed_task.add_job(meta_class(timed_config=task_info)) ++ ++ timed_task.start() ++ + +-app = init_application(name="zeus", settings=configuration, register_urls=URLS) ++def main(): ++ _app = init_application(name="zeus", settings=configuration, register_urls=URLS) ++ socketio.init_app(app=_app) ++ _init_timed_task(application=_app) ++ return _app + ++app = main() + + if __name__ == "__main__": +- app.run(host=configuration.zeus.get('IP'), port=configuration.zeus.get('PORT')) ++ app.run(host=configuration.zeus.get("IP"), port=configuration.zeus.get("PORT")) ++ socketio.run( ++ app, ++ host=configuration.zeus.get("IP"), ++ port=configuration.zeus.get("PORT"), ++ ) +diff --git a/zeus/url.py b/zeus/url.py +index 5f00ef9..099b6b5 100644 +--- a/zeus/url.py ++++ b/zeus/url.py +@@ -53,11 +53,21 @@ from zeus.conf.constant import ( + SYNC_CONFIG, + OBJECT_FILE_CONFIG, + GET_HOST_STATUS, ++ BATCH_SYNC_CONFIG, ++ ADD_HOST_SYNC_STATUS, ++ DELETE_HOST_SYNC_STATUS, ++ GET_HOST_SYNC_STATUS, ++ CONF_TRACE_MGMT, ++ CONF_TRACE_DATA, ++ CONF_TRACE_QUERY, ++ CONF_TRACE_DELETE, ++ DELETE_ALL_HOST_SYNC_STATUS + ) + from zeus.config_manager import view as config_view + from zeus.host_manager import view as host_view + from zeus.metric_manager import view as metric_view + from zeus.vulnerability_manage import view as vulnerability_view ++from zeus.conftrace_manage import view as conf_trace_view + + URLS = [] + +@@ -82,6 +92,10 @@ SPECIFIC_URLS = { + (host_view.GetHostInfo, QUERY_HOST_DETAIL), + (host_view.GetHostCount, GET_HOST_COUNT), + (host_view.GetHostTemplateFile, GET_HOST_TEMPLATE_FILE), ++ (host_view.AddHostSyncStatus, ADD_HOST_SYNC_STATUS), ++ (host_view.DeleteHostSyncStatus, DELETE_HOST_SYNC_STATUS), ++ (host_view.DeleteAllHostSyncStatus, DELETE_ALL_HOST_SYNC_STATUS), ++ (host_view.GetHostSyncStatus, GET_HOST_SYNC_STATUS) + ], + "HOST_GROUP_URLS": [ + (host_view.AddHostGroup, ADD_GROUP), +@@ -92,6 +106,7 @@ SPECIFIC_URLS = { + (config_view.CollectConfig, COLLECT_CONFIG), + (config_view.SyncConfig, SYNC_CONFIG), + (config_view.ObjectFileConfig, OBJECT_FILE_CONFIG), ++ (config_view.BatchSyncConfig, BATCH_SYNC_CONFIG) + ], + 'AGENT_URLS': [ + (agent_view.AgentPluginInfo, AGENT_PLUGIN_INFO), +@@ -111,6 +126,12 @@ SPECIFIC_URLS = { + (metric_view.QueryHostMetricData, QUERY_METRIC_DATA), + (metric_view.QueryHostMetricList, QUERY_METRIC_LIST), + ], ++ 'CONF_TRACE_URLS': [ ++ (conf_trace_view.ConfTraceMgmt, CONF_TRACE_MGMT), ++ (conf_trace_view.ConfTraceData, CONF_TRACE_DATA), ++ (conf_trace_view.ConfTraceQuery, CONF_TRACE_QUERY), ++ (conf_trace_view.ConfTraceDataDelete, CONF_TRACE_DELETE), ++ ] + } + + for _, value in SPECIFIC_URLS.items(): +diff --git a/zeus/utils/__init__.py b/zeus/utils/__init__.py +new file mode 100644 +index 0000000..4b94fcf +--- /dev/null ++++ b/zeus/utils/__init__.py +@@ -0,0 +1,18 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: __init__.py.py ++@Time: 2024/1/22 11:42 ++@Author: JiaoSiMao ++Description: ++""" +diff --git a/zeus/utils/conf_tools.py b/zeus/utils/conf_tools.py +new file mode 100644 +index 0000000..4b9d073 +--- /dev/null ++++ b/zeus/utils/conf_tools.py +@@ -0,0 +1,55 @@ ++#!/usr/bin/python3 ++# ****************************************************************************** ++# Copyright (C) 2023 isoftstone Technologies Co., Ltd. All rights reserved. ++# licensed under the Mulan PSL v2. ++# You can use this software according to the terms and conditions of the Mulan PSL v2. ++# You may obtain a copy of Mulan PSL v2 at: ++# http://license.coscl.org.cn/MulanPSL2 ++# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR ++# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR ++# PURPOSE. ++# See the Mulan PSL v2 for more details. ++# ******************************************************************************/ ++""" ++@FileName: conf_tools.py ++@Time: 2024/1/22 13:38 ++@Author: JiaoSiMao ++Description: ++""" ++import ast ++import configparser ++import os ++ ++from zeus.conf import MANAGER_CONFIG_PATH ++from zeus.conf.constant import DOMAIN_LIST_API, EXPECTED_CONFS_API, DOMAIN_CONF_DIFF_API ++ ++ ++class ConfTools(object): ++ ++ @staticmethod ++ def load_url_by_conf(): ++ """ ++ desc: get the url of sync conf ++ """ ++ cf = configparser.ConfigParser() ++ if os.path.exists(MANAGER_CONFIG_PATH): ++ cf.read(MANAGER_CONFIG_PATH, encoding="utf-8") ++ else: ++ parent = os.path.dirname(os.path.realpath(__file__)) ++ conf_path = os.path.join(parent, "../config/zeus.ini") ++ cf.read(conf_path, encoding="utf-8") ++ ++ update_sync_status_address = ast.literal_eval(cf.get("update_sync_status", "update_sync_status_address")) ++ ++ update_sync_status_port = str(cf.get("update_sync_status", "update_sync_status_port")) ++ domain_list_url = "{address}:{port}{api}".format(address=update_sync_status_address, api=DOMAIN_LIST_API, ++ port=update_sync_status_port) ++ expected_confs_url = "{address}:{port}{api}".format(address=update_sync_status_address, api=EXPECTED_CONFS_API, ++ port=update_sync_status_port) ++ domain_conf_diff_url = "{address}:{port}{api}".format(address=update_sync_status_address, ++ api=DOMAIN_CONF_DIFF_API, ++ port=update_sync_status_port) ++ ++ url = {"domain_list_url": domain_list_url, "expected_confs_url": expected_confs_url, ++ "domain_conf_diff_url": domain_conf_diff_url} ++ return url diff --git a/aops-zeus.spec b/aops-zeus.spec index 3f9f5eb..cf0dcae 100644 --- a/aops-zeus.spec +++ b/aops-zeus.spec @@ -1,6 +1,6 @@ Name: aops-zeus Version: v1.4.0 -Release: 7 +Release: 9 Summary: A host and user manager service which is the foundation of aops. License: MulanPSL2 URL: https://gitee.com/openeuler/%{name} @@ -16,7 +16,7 @@ Patch0008: 0008-check-host-status-when-query-host-detail.patch Patch0009: 0009-fix-error-log-when-query-host-status.patch Patch0010: 0010-update-the-exception-catching-type-of-the-function.patch Patch0011: 0011-fix-command-injection-vulnerabilities.patch - +Patch0012: 0012-conf-trace-info-and-conf-sync-optimize.patch BuildRequires: python3-setuptools Requires: aops-vulcanus >= v1.3.0 @@ -43,19 +43,27 @@ A host and user manager service which is the foundation of aops. %py3_install mkdir -p %{buildroot}/opt/aops/ cp -r database %{buildroot}/opt/aops/ - +cp -r ansible_task %{buildroot}/opt/aops/ %files %doc README.* %attr(0644,root,root) %{_sysconfdir}/aops/zeus.ini +%attr(0644,root,root) %{_sysconfdir}/aops/zeus_crontab.yml %attr(0755,root,root) %{_bindir}/aops-zeus %attr(0755,root,root) /usr/lib/systemd/system/aops-zeus.service %{python3_sitelib}/aops_zeus*.egg-info %{python3_sitelib}/zeus/* %attr(0755, root, root) /opt/aops/database/* +%attr(0755, root, root) /opt/aops/ansible_task/* %changelog +* Mon Jul 01 2024 smjiao - v1.4.0-9 +- file trace interface + +* Mon Jul 01 2024 smjiao - v1.4.0-8 +- conf trace sync interface optimization + * Thu Mar 07 2024 wenxin - v1.4.0-7 - fix command injection vulnerabilities @@ -76,7 +84,7 @@ cp -r database %{buildroot}/opt/aops/ * Mon Dec 18 2023 wenxin - v1.4.0-2 - Add interface for detecting host status. - Update query host list api, add a new query method based on host name for it. -- Add rollback task execution method. +- Add rollback task execution method. - Fix cve scan callback error. * Tue Dec 12 2023 wenxin - v1.4.0-1 @@ -95,7 +103,7 @@ cp -r database %{buildroot}/opt/aops/ * Wed Oct 18 2023 wenxin - v1.3.1-2 - fix bug: metric proxy init failed - add a way about key authentication for add host api -- remove python3-prometheus-api-client +- remove python3-prometheus-api-client * Thu Sep 21 2023 wenxin - v1.3.1-1 - update spec requires @@ -129,11 +137,11 @@ cp -r database %{buildroot}/opt/aops/ * Thu Apr 27 2023 wenixn - v1.2.0-2 - Fix token is not invalidated after the token was refreshed - update args validation rules for add account and for add host -- replace thread scheduling with gevent scheduling when add host by batch +- replace thread scheduling with gevent scheduling when add host by batch * Mon Apr 17 2023 wenixn - v1.2.0-1 - update the call method of ceres; add function how to add host from web -- add api: update host info +- add api: update host info * Tue Dec 27 2022 wenxin - v1.1.1-4 - Modify uwsgi configuration file fields @@ -154,3 +162,4 @@ cp -r database %{buildroot}/opt/aops/ * Tue Nov 22 2022 zhuyuncheng - v1.0.0-1 - Package init + -- Gitee