From e31486df60c57e355cdd5219d7c0bd43258acfe4 Mon Sep 17 00:00:00 2001 From: JohnSaxon Date: Wed, 18 Oct 2017 19:07:52 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20cheungssh=5Fthread=5Fq?= =?UTF-8?q?ueue.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cheungssh/cheungssh_thread_queue.py | 190 +++++++++--------- 1 file changed, 95 insertions(+), 95 deletions(-) diff --git a/mysite/mysite/cheungssh/cheungssh_thread_queue.py b/mysite/mysite/cheungssh/cheungssh_thread_queue.py index 62a2135..050449c 100644 --- a/mysite/mysite/cheungssh/cheungssh_thread_queue.py +++ b/mysite/mysite/cheungssh/cheungssh_thread_queue.py @@ -1,95 +1,95 @@ -#!/usr/bin/env python -#coding:utf-8 -import threading,Queue,time -from cheungssh_auto_thread import AutoGetThreadNum -from cheungssh_modul_controler import CheungSSHControler -from cheungssh_error import CheungSSHError -import os,sys,json -os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") -sys.path.append('/home/cheungssh/mysite') -from django.core.cache import cache - - -class CheungSSHThreadAdmin(object): - - def __init__(self): - self.REDIS=cache.master_client - def run(self,parameter={}): - cheungssh_info={"status":True,"content":""} - try: - task_type=parameter["task_type"] - tid=parameter["tid"] - multi_thread=parameter["multi_thread"] - if not type(multi_thread) ==type(False):raise CheungSSHError("CHB0000000010") - if task_type=="cmd": - cmd=parameter["cmd"] - servers=parameter["servers"] - if not type(servers)==type([]):raise CheungSSHError("CHB0000000011") - total="total.%s" % tid - current="current.%s" %tid - self.REDIS.set(total,len(servers)) - self.REDIS.set(current,0) - #CheungSSHConnector.progress[total]=len(servers) - #CheungSSHConnector.progress[current]=0 - if multi_thread: - - pool=CheungSSHPool() - for s in servers: - controler=CheungSSHControler() - param={"cmd":cmd,"sid":s,"tid":tid} - pool.add_task(controler.command_controler,param) - - else: - - for s in servers: - controler=CheungSSHControler() - controler.command_controler(cmd=cmd,sid=s,tid=tid) - - elif task_type=="file": - pass - else: - raise CheungSSHError("CHB0000000009") - except Exception,e: - cheungssh_info["content"]=str(e) - cheungssh_info["status"]=False - print cheungssh_info - - return cheungssh_info - -class CheungSSHThread(threading.Thread): - def __init__(self,queue): - threading.Thread.__init__(self) - self.queue=queue - self.daemon=True - self.start() - def run(self): - while True: - try: - func,kws=self.queue.get() - func(**kws) - except Exception,e: - print e - pass - self.queue.task_done() -class CheungSSHPool(AutoGetThreadNum): - - def __init__(self): - AutoGetThreadNum.__init__(self) - self.thread_num=self.auto_thread() - self.queue=Queue.Queue(self.thread_num) - for i in range(self.thread_num):#循环多少次,实际上是一个数字 - CheungSSHThread(self.queue) - def add_task(self,func,dict): - self.queue.put((func,dict))#把参数和函数,放到队列里面去,然后,有一个run会来这里取的 - def all_complete(self): - self.queue.join() - -def test_func(**kws): - time.sleep(0.5) - print "哈哈,这里是你穿的参数",kws - -if __name__=='__main__': - p=CheungSSHPool() - for i in range(20): - p.add_task(test_func,{"username":i}) - p.all_complete() +#!/usr/bin/env python +#coding:utf-8 +import threading,Queue,time +from cheungssh_auto_thread import AutoGetThreadNum +from cheungssh_modul_controler import CheungSSHControler +from cheungssh_error import CheungSSHError +import os,sys,json +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") +sys.path.append('/home/cheungssh/mysite') +from django.core.cache import cache + + +class CheungSSHThreadAdmin(object): + + def __init__(self): + self.REDIS=cache.master_client + def run(self,parameter={}): + cheungssh_info={"status":True,"content":""} + try: + task_type=parameter["task_type"] + tid=parameter["tid"] + multi_thread=parameter["multi_thread"] + if not type(multi_thread) ==type(False):raise CheungSSHError("CHB0000000010") + if task_type=="cmd": + cmd=parameter["cmd"] + servers=parameter["servers"] + if not type(servers)==type([]):raise CheungSSHError("CHB0000000011") + total="total.%s" % tid + current="current.%s" %tid + self.REDIS.set(total,len(servers)) + self.REDIS.set(current,0) + #CheungSSHConnector.progress[total]=len(servers) + #CheungSSHConnector.progress[current]=0 + if multi_thread: + + pool=CheungSSHPool() + for s in servers: + controler=CheungSSHControler() + param={"cmd":cmd,"sid":s,"tid":tid} + pool.add_task(controler.command_controler,param) + + else: + + for s in servers: + controler=CheungSSHControler() + controler.command_controler(cmd=cmd,sid=s,tid=tid) + + elif task_type=="file": + pass + else: + raise CheungSSHError("CHB0000000009") + except Exception,e: + cheungssh_info["content"]=str(e) + cheungssh_info["status"]=False + print cheungssh_info + + return cheungssh_info + +class CheungSSHThread(threading.Thread): + def __init__(self,queue): + threading.Thread.__init__(self) + self.queue=queue + self.daemon=True + self.start() + def run(self): + while not self.quque.empty(): + try: + func,kws=self.queue.get() + func(**kws) + except Exception,e: + print e + pass + self.queue.task_done() +class CheungSSHPool(AutoGetThreadNum): + + def __init__(self): + AutoGetThreadNum.__init__(self) + self.thread_num=self.auto_thread() + self.queue=Queue.Queue(self.thread_num) + for i in range(self.thread_num):#循环多少次,实际上是一个数字 + CheungSSHThread(self.queue) + def add_task(self,func,dict): + self.queue.put((func,dict))#把参数和函数,放到队列里面去,然后,有一个run会来这里取的 + def all_complete(self): + self.queue.join() + +def test_func(**kws): + time.sleep(0.5) + print "哈哈,这里是你穿的参数",kws + +if __name__=='__main__': + p=CheungSSHPool() + for i in range(20): + p.add_task(test_func,{"username":i}) + p.all_complete() -- Gitee From 2f7e3e6bae0d35f4ae7e77399c8e1e7bbe2f2334 Mon Sep 17 00:00:00 2001 From: JohnSaxon Date: Wed, 18 Oct 2017 19:47:18 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E8=80=97=E5=B0=BDbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mysite/cheungssh/cheungssh_auto_thread.py | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/mysite/mysite/cheungssh/cheungssh_auto_thread.py b/mysite/mysite/cheungssh/cheungssh_auto_thread.py index aee9105..9c62390 100644 --- a/mysite/mysite/cheungssh/cheungssh_auto_thread.py +++ b/mysite/mysite/cheungssh/cheungssh_auto_thread.py @@ -1,24 +1,25 @@ -#!/usr/bin/env python -#coding:utf-8 - -class AutoGetThreadNum: - def __init__(self): - - self.cpu=100 - self.thread=5 - def auto_thread(self): - if self.cpu>90: - self.thread=200 - elif self.cpu>80: - self.thread=150 - elif self.cpu>70: - self.thread=100 - elif self.cpu>60: - self.thread=50 - elif self.cpu>50: - self.thread=40 - elif self.cpu>30: - self.thread=20 - elif self.cpu>20: - self.thread=5 - return self.thread +#!/usr/bin/env python +#coding:utf-8 +from multiprocessing import cpu_count + +class AutoGetThreadNum: + def __init__(self): + + self.cpu=cpu_count()*2 ###cpu_count会得出逻辑cpu数,目前大多数cpu都有超线程,所以*2 + self.thread=5 + def auto_thread(self): + if self.cpu>90: + self.thread=200 + elif self.cpu>80: + self.thread=150 + elif self.cpu>70: + self.thread=100 + elif self.cpu>60: + self.thread=50 + elif self.cpu>50: + self.thread=40 + elif self.cpu>30: + self.thread=20 + elif self.cpu>20: + self.thread=5 + return self.thread -- Gitee From 71036e837ac2e47bbb25efd9e369a954a2a9906d Mon Sep 17 00:00:00 2001 From: JohnSaxon Date: Wed, 18 Oct 2017 19:53:32 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20cheungssh=5Fthread=5Fq?= =?UTF-8?q?ueue.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mysite/mysite/cheungssh/cheungssh_thread_queue.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/mysite/mysite/cheungssh/cheungssh_thread_queue.py b/mysite/mysite/cheungssh/cheungssh_thread_queue.py index 050449c..20d0a65 100644 --- a/mysite/mysite/cheungssh/cheungssh_thread_queue.py +++ b/mysite/mysite/cheungssh/cheungssh_thread_queue.py @@ -79,6 +79,15 @@ class CheungSSHPool(AutoGetThreadNum): self.queue=Queue.Queue(self.thread_num) for i in range(self.thread_num):#循环多少次,实际上是一个数字 CheungSSHThread(self.queue) + # def __init__(self, thread_num): + # AutoGetThreadNum.__init__(self) + # if thread_num < 256: + # self.thread_num=thread_num + # else: + # self.thread_num=256 ###最好可以通过初始化时外部传入参数,有多少任务,建多少thread,如果超过最大线程数取最大线程数 + # self.queue=Queue.Queue(self.thread_num) + # for i in range(self.thread_num):#循环多少次,实际上是一个数字 + # CheungSSHThread(self.queue) def add_task(self,func,dict): self.queue.put((func,dict))#把参数和函数,放到队列里面去,然后,有一个run会来这里取的 def all_complete(self): -- Gitee