代码拉取完成,页面将自动刷新
# -*- coding: UTF-8 -*-
"""
工作线程管理模块
本模块负责文件/目录的同步细节,以及管控等
"""
import time
import fs_global as Global
from fs_data import TaskQueue, RetryQueue
from fs_logger import Logger
from fs_data import ConfigWrapper
from fs_util import ThreadPool, MyThreading, Common, Counter, Singleton
class WarnExcept(Exception):
    """ warn级别异常 """
    pass
class ErrorExcept(Exception):
    """ error级别异常 """
    pass
class Slaves(Singleton):
    """ 工作线程管理类 """
    def __init__(self, count):
        self.count = count
        self.pool = None
        self.curr_listen = None
        self.last_listen = None
        self.retry_period = 60
        self.check_period = 10
        self.worker_period = 1
        self.syncing = []
        self.ready_flag = False
    def init(self):
        """ 重写基类的init, 用于避免使用signal机制,统一由Master调度 """
        pass
    def find_listen(self, task):
        """
        找到task对应配置文件中监听的目录:
        先从当前配置文件数据字典中找,
        如果没找到再到reload之前的配置文件数据中找,
        参数:
            1. 具体任务
        返回值:
            任务所在的监听目录;
            是否是上一次的配置文件数据
        """
        _contain = Common.is_contain
        for listen in self.curr_listen:
            if not _contain(listen, task):
                continue
            else:
                return listen, False
        # 继续从reload之前的配置中寻找
        for listen in self.last_listen:
            if not _contain(listen, task):
                continue
            else:
                return listen, True
        return None, None
    def combine(self, thread_id, task):
        """
        组合rsync同步参数
        通过task找到对应的监听目录后,找到该监听目录对应的其他配置参数
        参数:
            task: 具体任务
        返回值:
            rsync同步命令字符串
        """
        # 如果是临时目录或文件,同步可能会失败,需要判断一下目录或文件在不在
        if not Common.is_exists(task):
            raise WarnExcept("is not exist, ignore...")
        listen, last = self.find_listen(task)
        if not listen:
            raise ErrorExcept("not in config ini, ignore...")
        if last:
            Logger.warn('[%s] %s in last config section %s' % (thread_id, task, listen))
        _get_listen_value = ConfigWrapper.get_key_value
        remote_mkdir = _get_listen_value('make_remote_dir', last=last)
        delete = _get_listen_value('delete_not_in_src', last=last)
        remote_ips = _get_listen_value('remote_ip', listen, last).split(',')
        checksum = _get_listen_value('checksum', listen, last)
        compress = _get_listen_value('compress', listen, last)
        exclude = _get_listen_value('exclude', listen, last)
        
        task_dir, task_file = Common.split_path(task)
        param = "%s -a" % Global.G_RSYNC_TOOL
        if checksum == 'true':
            param += 'c'
        if compress == 'true':
            param += 'z'
        if exclude:
            # 只有一个过滤条件时,不能用{},否则过滤会失效
            if len(exclude.split(',')) == 1:
                param += ' --exclude=%s' % exclude
            else:
                param += ' --exclude={%s}' % exclude
        if delete == 'true':
            param += ' --delete'
        param += ' --rsh=ssh'
        cmd_dict = {}
        """ 判断IP是否可达 """
        for remote_ip in remote_ips:
            if remote_ip not in Global.G_CONNECT_IP_LIST:
                Logger.warn("[%s] %s is unavailable IP, ignore %s" % (thread_id, remote_ip, task))
                continue
            # 注:任务可能是文件也可能是目录
            # 统一取上一层目录,进入后同步
            _param = "%s %s %s@%s:%s" % (param, task_file, Global.G_RSYNC_USER, remote_ip, task_dir)
            # 如果full_sync为false或者其他场景下,同步可能会因对端的目录不存在而报错
            # 这里根据make_remote_dir配置,判断是否先登录对端创建该目录,开启会影响同步性能
            prev_cmd = None
            if remote_mkdir == 'true':
                prev_cmd = "ssh %s@%s 'mkdir -p %s'" % (Global.G_RSYNC_USER, remote_ip, task_dir)
            cmd = "cd %s && %s" % (task_dir, _param)
            if prev_cmd:
                cmd = ';'.join([prev_cmd, cmd])
            cmd_dict[remote_ip] = cmd
        return cmd_dict
    @Counter
    def rsync(self, thread_id, cmd):
        """
        同步动作函数
        使用rsync同步文件或目录;
        使用Counter装饰器包装,用于计算耗时;
        返回值:(Counter中封装)
            ret:   退出值
            detail:命令执行结构详细输出信息
        """
        Logger.debug("[%s] %s" % (thread_id, cmd))
        ret, out, err = Common.shell_cmd(cmd)
        return ret, err
    def doing(self, thread_id, task, is_retry):
        """ 先组装同步参数再执行同步 """
        ret, detail = -1, None
        self.syncing.append(task)
        try:
            cmd_dict = self.combine(thread_id, task)
            size = Common.get_size(task)
            # 执行同步动作
            for ip, cmd in cmd_dict.items():
                ret, detail = self.rsync(thread_id, cmd)
                detail = "To %s, Size %s, %s" % (ip, size, detail)
                # 0表示成功
                if not ret:
                    Logger.info("[%s] sync success %s, %s" % (thread_id, task, detail))
                else:
                    info = "[%s] sync failed %s, %s" % (thread_id, task, detail)
                    if is_retry:
                        Logger.error(info)
                    else:
                        Logger.warn(info)
                        # 失败且不是失败重传任务时,放入失败队列进行重试
                        RetryQueue.push_task(task)
        except WarnExcept as e:
            Logger.warn("[%s] WarnExcept %s %s" % (thread_id, task, e))
        except ErrorExcept as e:
            Logger.error("[%s] ErrorExcept %s %s" % (thread_id, task, e))
        finally:
            self.syncing.remove(task)
    def deal(self, thread_id, task_list, is_retry=False):
        """
        同步任务处理函数
        在一些极端场景下会出现多个线程同时同步同一个文件或目录,
        因此任务消费时记录正在同步的任务,同步前判断该任务是否正在消费,
        有消费冲撞时,将任务暂存,待获取的所有任务消费完后再尝试同步.
        参数:
            1. thread_id: 线程id
            2. task_list: 该线程获取的任务列表
        返回值:None
        """
        # 用于暂存冲突的task
        collision = []
        for task in task_list:
            # Logger.info("[%s] deal %s" % (thread_id, task))
            if task not in self.syncing:
                self.doing(thread_id, task, is_retry)
                continue
            # task同步冲突时,暂存冲突的task,防止与其他线程重复同步
            Logger.debug("[%s] %s crash syncing" % (thread_id, task))
            collision.append(task)
        # 处理上一个循环中加入的冲突task
        if collision:
            self.crash_handle(thread_id, collision, is_retry)
    def crash_handle(self, thread_id, tasks, is_retry):
        """
        冲突任务处理
        每2秒循环检测一次冲突任务, 最多检测5分钟
        """
        for t in range(150):
            t += 1
            time.sleep(2)
            oks = []
            for task in tasks:
                if task in self.syncing:
                    Logger.debug("[%s] %s syncing still, waited %ss" % (thread_id, task, t*2))
                    continue
                self.doing(thread_id, task, is_retry)
                oks.append(task)
            # 每一轮都从冲突任务中清理掉重新尝试同步成功的任务 #
            for o in oks:
                tasks.remove(o)
            if not tasks:
                return
        Logger.info("[%s] after crash_handle ignored over-time crash tasks: %s" % (thread_id, tasks))
    def require(self, args=None):
        """
        线程池处理函数
        请求同步任务并进行处理
        参数:
            args: 暂只包含线程id
        返回值: None
        """
        thread_id, = args
        thread_id = 'thread-worker-%s' % thread_id
        task_list = TaskQueue.request_tesk()
        if not task_list:
            return
        Logger.info("[%s] got %s tasks:\n%s" % (thread_id, len(task_list), '\n'.join(task_list)))
        self.set_config_data()
        self.deal(thread_id, task_list)
    def set_config_data(self):
        _curr = ConfigWrapper.get_listen_path(last=False)
        if self.curr_listen != _curr:
            self.curr_listen = _curr
            self.last_listen = ConfigWrapper.get_listen_path(last=True)
    def wait_for_ready(self):
        while 1:
            if self.ready_flag:
                return
            time.sleep(1)
    def retry_process(self, args=None):
        """
        失败重传线程处理函数
        小周期定时任务
        失败重传线程从失败重传队列中获取任务进行重传处理
        参数:None
        返回值:None
        """
        task_list = RetryQueue.request_task()
        if not task_list:
            return
        self.set_config_data()
        self.deal('thread-retryer', task_list, True)
    def connect_check(self, args=None):
        """
        检验各个监听目录对应的对端IP是否可达
        """
        thread_id, _tmp_ip = 'thread-checker', []
        for last in [False, True]:
            for listen in ConfigWrapper.get_listen_path(last=last):
                ip_list = ConfigWrapper.get_key_value(key='remote_ip', section=listen, last=last).split(',')
                for ip in ip_list:
                    if not Common.is_ip(ip):
                        Logger.warn("[%s] IP of %s is invalid:%s, last:%s" % (thread_id, listen, ip, last))
                        continue
                    if ip not in _tmp_ip:
                        _tmp_ip.append(ip)
        """ 保存正常连接的IP """
        for ip, result in Common.batch_ping(_tmp_ip).items():
            if not result:
                Logger.warn('[%s] %s is disconnect' % (thread_id, ip))
                if ip in Global.G_CONNECT_IP_LIST:
                    Global.G_CONNECT_IP_LIST.remove(ip)
            elif ip not in Global.G_CONNECT_IP_LIST:
                Global.G_CONNECT_IP_LIST.append(ip)
        self.ready_flag = True
        Logger.info('[%s] after check connect G_CONNECT_IP_LIST=%s' % (thread_id, Global.G_CONNECT_IP_LIST))
    def fully_sync(self, args=None):
        """
        大周期定时任务
        负责全量数据同步(只做当前配置文件的全量同步)
        注:首次启动时,需要等待IP检测(connect_check)完,
        否则首次全量数据同步会误认为对端IP不可用
        """
        self.wait_for_ready()
        task_list = []
        for listen in ConfigWrapper.get_listen_path():
            sync_all_switch = ConfigWrapper.get_key_value(key='full_sync', section=listen)
            if sync_all_switch == 'false':
                continue
            task_list.append(listen)
        self.set_config_data()
        # 全量同步不进行失败重传
        self.deal('thread-fullsync', task_list, True)
    def start_worker(self):
        """ 启动任务处理工作线程 """
        self.pool = ThreadPool(func=self.require, period=self.worker_period)
        self.pool.init(self.count)
        self.pool.start()
    def start_retry(self):
        """ 启动失败重传任务线程 """
        MyThreading(func=self.retry_process, period=self.retry_period).start()
    def start_checker(self):
        """ 启动链路状态检测任务线程 """
        MyThreading(func=self.connect_check, period=self.check_period).start()
    def start_fullsync(self):
        """ 启动全量数据同步任务线程 """
        period = float(ConfigWrapper.get_key_value('fullsync_period'))
        MyThreading(func=self.fully_sync, period=period).start()
    def start(self):
        self.start_checker()
        self.start_worker()
        self.start_retry()
        self.start_fullsync()
    def status(self):
        return self.syncing, Global.G_CONNECT_IP_LIST
    def pause(self):
        if self.pool:
            self.pool.pause()
    def resume(self):
        if self.pool:
            self.pool.resume()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。