代码拉取完成,页面将自动刷新
# -*- coding: UTF-8 -*-
"""
工作线程管理模块
本模块负责文件/目录的同步细节,以及管控等
"""
import time
import fs_global as Global
from fs_data import TaskQueue, RetryQueue
from fs_logger import Logger
from fs_data import ConfigWrapper
from fs_util import ThreadPool, MyThreading, Common, Counter, Singleton
class WarnExcept(Exception):
""" warn级别异常 """
pass
class ErrorExcept(Exception):
""" error级别异常 """
pass
class Slaves(Singleton):
""" 工作线程管理类 """
def __init__(self, count):
self.count = count
self.pool = None
self.curr_listen = None
self.last_listen = None
self.retry_period = 60
self.check_period = 10
self.worker_period = 1
self.syncing = []
self.ready_flag = False
def init(self):
""" 重写基类的init, 用于避免使用signal机制,统一由Master调度 """
pass
def find_listen(self, task):
"""
找到task对应配置文件中监听的目录:
先从当前配置文件数据字典中找,
如果没找到再到reload之前的配置文件数据中找,
参数:
1. 具体任务
返回值:
任务所在的监听目录;
是否是上一次的配置文件数据
"""
_contain = Common.is_contain
for listen in self.curr_listen:
if not _contain(listen, task):
continue
else:
return listen, False
# 继续从reload之前的配置中寻找
for listen in self.last_listen:
if not _contain(listen, task):
continue
else:
return listen, True
return None, None
def combine(self, thread_id, task):
"""
组合rsync同步参数
通过task找到对应的监听目录后,找到该监听目录对应的其他配置参数
参数:
task: 具体任务
返回值:
rsync同步命令字符串
"""
# 如果是临时目录或文件,同步可能会失败,需要判断一下目录或文件在不在
if not Common.is_exists(task):
raise WarnExcept("is not exist, ignore...")
listen, last = self.find_listen(task)
if not listen:
raise ErrorExcept("not in config ini, ignore...")
if last:
Logger.warn('[%s] %s in last config section %s' % (thread_id, task, listen))
_get_listen_value = ConfigWrapper.get_key_value
remote_mkdir = _get_listen_value('make_remote_dir', last=last)
delete = _get_listen_value('delete_not_in_src', last=last)
remote_ips = _get_listen_value('remote_ip', listen, last).split(',')
checksum = _get_listen_value('checksum', listen, last)
compress = _get_listen_value('compress', listen, last)
exclude = _get_listen_value('exclude', listen, last)
task_dir, task_file = Common.split_path(task)
param = "%s -a" % Global.G_RSYNC_TOOL
if checksum == 'true':
param += 'c'
if compress == 'true':
param += 'z'
if exclude:
# 只有一个过滤条件时,不能用{},否则过滤会失效
if len(exclude.split(',')) == 1:
param += ' --exclude=%s' % exclude
else:
param += ' --exclude={%s}' % exclude
if delete == 'true':
param += ' --delete'
param += ' --rsh=ssh'
cmd_dict = {}
""" 判断IP是否可达 """
for remote_ip in remote_ips:
if remote_ip not in Global.G_CONNECT_IP_LIST:
Logger.warn("[%s] %s is unavailable IP, ignore %s" % (thread_id, remote_ip, task))
continue
# 注:任务可能是文件也可能是目录
# 统一取上一层目录,进入后同步
_param = "%s %s %s@%s:%s" % (param, task_file, Global.G_RSYNC_USER, remote_ip, task_dir)
# 如果full_sync为false或者其他场景下,同步可能会因对端的目录不存在而报错
# 这里根据make_remote_dir配置,判断是否先登录对端创建该目录,开启会影响同步性能
prev_cmd = None
if remote_mkdir == 'true':
prev_cmd = "ssh %s@%s 'mkdir -p %s'" % (Global.G_RSYNC_USER, remote_ip, task_dir)
cmd = "cd %s && %s" % (task_dir, _param)
if prev_cmd:
cmd = ';'.join([prev_cmd, cmd])
cmd_dict[remote_ip] = cmd
return cmd_dict
@Counter
def rsync(self, thread_id, cmd):
"""
同步动作函数
使用rsync同步文件或目录;
使用Counter装饰器包装,用于计算耗时;
返回值:(Counter中封装)
ret: 退出值
detail:命令执行结构详细输出信息
"""
Logger.debug("[%s] %s" % (thread_id, cmd))
ret, out, err = Common.shell_cmd(cmd)
return ret, err
def doing(self, thread_id, task, is_retry):
""" 先组装同步参数再执行同步 """
ret, detail = -1, None
self.syncing.append(task)
try:
cmd_dict = self.combine(thread_id, task)
size = Common.get_size(task)
# 执行同步动作
for ip, cmd in cmd_dict.items():
ret, detail = self.rsync(thread_id, cmd)
detail = "To %s, Size %s, %s" % (ip, size, detail)
# 0表示成功
if not ret:
Logger.info("[%s] sync success %s, %s" % (thread_id, task, detail))
else:
info = "[%s] sync failed %s, %s" % (thread_id, task, detail)
if is_retry:
Logger.error(info)
else:
Logger.warn(info)
# 失败且不是失败重传任务时,放入失败队列进行重试
RetryQueue.push_task(task)
except WarnExcept as e:
Logger.warn("[%s] WarnExcept %s %s" % (thread_id, task, e))
except ErrorExcept as e:
Logger.error("[%s] ErrorExcept %s %s" % (thread_id, task, e))
finally:
self.syncing.remove(task)
def deal(self, thread_id, task_list, is_retry=False):
"""
同步任务处理函数
在一些极端场景下会出现多个线程同时同步同一个文件或目录,
因此任务消费时记录正在同步的任务,同步前判断该任务是否正在消费,
有消费冲撞时,将任务暂存,待获取的所有任务消费完后再尝试同步.
参数:
1. thread_id: 线程id
2. task_list: 该线程获取的任务列表
返回值:None
"""
# 用于暂存冲突的task
collision = []
for task in task_list:
# Logger.info("[%s] deal %s" % (thread_id, task))
if task not in self.syncing:
self.doing(thread_id, task, is_retry)
continue
# task同步冲突时,暂存冲突的task,防止与其他线程重复同步
Logger.debug("[%s] %s crash syncing" % (thread_id, task))
collision.append(task)
# 处理上一个循环中加入的冲突task
if collision:
self.crash_handle(thread_id, collision, is_retry)
def crash_handle(self, thread_id, tasks, is_retry):
"""
冲突任务处理
每2秒循环检测一次冲突任务, 最多检测5分钟
"""
for t in range(150):
t += 1
time.sleep(2)
oks = []
for task in tasks:
if task in self.syncing:
Logger.debug("[%s] %s syncing still, waited %ss" % (thread_id, task, t*2))
continue
self.doing(thread_id, task, is_retry)
oks.append(task)
# 每一轮都从冲突任务中清理掉重新尝试同步成功的任务 #
for o in oks:
tasks.remove(o)
if not tasks:
return
Logger.info("[%s] after crash_handle ignored over-time crash tasks: %s" % (thread_id, tasks))
def require(self, args=None):
"""
线程池处理函数
请求同步任务并进行处理
参数:
args: 暂只包含线程id
返回值: None
"""
thread_id, = args
thread_id = 'thread-worker-%s' % thread_id
task_list = TaskQueue.request_tesk()
if not task_list:
return
Logger.info("[%s] got %s tasks:\n%s" % (thread_id, len(task_list), '\n'.join(task_list)))
self.set_config_data()
self.deal(thread_id, task_list)
def set_config_data(self):
_curr = ConfigWrapper.get_listen_path(last=False)
if self.curr_listen != _curr:
self.curr_listen = _curr
self.last_listen = ConfigWrapper.get_listen_path(last=True)
def wait_for_ready(self):
while 1:
if self.ready_flag:
return
time.sleep(1)
def retry_process(self, args=None):
"""
失败重传线程处理函数
小周期定时任务
失败重传线程从失败重传队列中获取任务进行重传处理
参数:None
返回值:None
"""
task_list = RetryQueue.request_task()
if not task_list:
return
self.set_config_data()
self.deal('thread-retryer', task_list, True)
def connect_check(self, args=None):
"""
检验各个监听目录对应的对端IP是否可达
"""
thread_id, _tmp_ip = 'thread-checker', []
for last in [False, True]:
for listen in ConfigWrapper.get_listen_path(last=last):
ip_list = ConfigWrapper.get_key_value(key='remote_ip', section=listen, last=last).split(',')
for ip in ip_list:
if not Common.is_ip(ip):
Logger.warn("[%s] IP of %s is invalid:%s, last:%s" % (thread_id, listen, ip, last))
continue
if ip not in _tmp_ip:
_tmp_ip.append(ip)
""" 保存正常连接的IP """
for ip, result in Common.batch_ping(_tmp_ip).items():
if not result:
Logger.warn('[%s] %s is disconnect' % (thread_id, ip))
if ip in Global.G_CONNECT_IP_LIST:
Global.G_CONNECT_IP_LIST.remove(ip)
elif ip not in Global.G_CONNECT_IP_LIST:
Global.G_CONNECT_IP_LIST.append(ip)
self.ready_flag = True
Logger.info('[%s] after check connect G_CONNECT_IP_LIST=%s' % (thread_id, Global.G_CONNECT_IP_LIST))
def fully_sync(self, args=None):
"""
大周期定时任务
负责全量数据同步(只做当前配置文件的全量同步)
注:首次启动时,需要等待IP检测(connect_check)完,
否则首次全量数据同步会误认为对端IP不可用
"""
self.wait_for_ready()
task_list = []
for listen in ConfigWrapper.get_listen_path():
sync_all_switch = ConfigWrapper.get_key_value(key='full_sync', section=listen)
if sync_all_switch == 'false':
continue
task_list.append(listen)
self.set_config_data()
# 全量同步不进行失败重传
self.deal('thread-fullsync', task_list, True)
def start_worker(self):
""" 启动任务处理工作线程 """
self.pool = ThreadPool(func=self.require, period=self.worker_period)
self.pool.init(self.count)
self.pool.start()
def start_retry(self):
""" 启动失败重传任务线程 """
MyThreading(func=self.retry_process, period=self.retry_period).start()
def start_checker(self):
""" 启动链路状态检测任务线程 """
MyThreading(func=self.connect_check, period=self.check_period).start()
def start_fullsync(self):
""" 启动全量数据同步任务线程 """
period = float(ConfigWrapper.get_key_value('fullsync_period'))
MyThreading(func=self.fully_sync, period=period).start()
def start(self):
self.start_checker()
self.start_worker()
self.start_retry()
self.start_fullsync()
def status(self):
return self.syncing, Global.G_CONNECT_IP_LIST
def pause(self):
if self.pool:
self.pool.pause()
def resume(self):
if self.pool:
self.pool.resume()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。