diff --git a/Events.py b/Events.py index c52d4efc2b5dbd08d01d7211b12a5c8b852a80f4..1cba5fb57344ce80b8afe64190b1c19d94b8d682 100644 --- a/Events.py +++ b/Events.py @@ -14,7 +14,7 @@ class MyFileSystemEventHander(FileSystemEventHandler): if time.time()-eventgtimexz > 3: eventgtimexz=time.time() # print('* 更新文件:%s' % event.src_path) - # time.sleep(2) + # time.sleep(10) time.sleep(0.2) self.restart() class Events: diff --git a/common/autoload.py b/common/autoload.py index a015f40f0cfff3d1adde492eb1d36e360646cf3e..9d1c5cea66b8dbc77d9c7db0b413027949641009 100644 --- a/common/autoload.py +++ b/common/autoload.py @@ -232,6 +232,79 @@ class kcwebsocket: # def start(): # kwebsocket=kcwebsocket() # kwebsocket.start() + + +class DFAFilter(): + """DFA算法文字过滤器""" + __sensitivelist=[] + def __init__(self,sensitivelist): + """ + sensitivelist 敏感字符串列表 + """ + self.keyword_chains = {} # 关键词链表 + self.delimit = '\x00' # 限定 + for keyword in sensitivelist: + self.__add(str(keyword).strip()) + def __add(self, keyword): + keyword = keyword.lower() # 关键词英文变为小写 + chars = keyword.strip() # 关键字去除首尾空格和换行 + if not chars: # 如果关键词为空直接返回 + return + level = self.keyword_chains + # 遍历关键字的每个字 + for i in range(len(chars)): + # 如果这个字已经存在字符链的key中就进入其子字典 + if chars[i] in level: + level = level[chars[i]] + else: + if not isinstance(level, dict): + break + for j in range(i, len(chars)): + level[chars[j]] = {} + last_level, last_char = level, chars[j] + level = level[chars[j]] + last_level[last_char] = {self.delimit: 0} + break + if i == len(chars) - 1: + level[self.delimit] = 0 + def filter(self, message, repl=""): + """ 获取过滤后的字符串 + + message 待处理的字符串 + + repl 替换内容 + """ + message = message.lower() + ret = [] + start = 0 + while start < len(message): + level = self.keyword_chains + step_ins = 0 + for char in message[start:]: + if char in level: + step_ins += 1 + if self.delimit not in level[char]: + level = level[char] + else: + ret.append(repl * step_ins) + start += step_ins - 1 + break + else: + ret.append(message[start]) + break + else: + ret.append(message[start]) + start += 1 + return ''.join(ret) + def sensitive(self,message): + """ 判断字符串是否包含敏感 + + message 待处理的字符串 + """ + if self.filter(message) == message: + return True + else: + return False redis=kcwredis() def send_mail(user,text="邮件内容",theme="邮件主题",recNick="收件人昵称"): """发送邮件 @@ -542,7 +615,7 @@ def list_to_tree(data, pk = 'id', pid = 'pid', child = 'lowerlist', root=0,child return arr -class zip: +class kcwebzip: def packzip(src,dst): "压缩" filelist = [] @@ -850,7 +923,7 @@ class create: """ """打包模块""" if os.path.exists(self.appname+"/"+self.modular+"/controller/"+plug): - zip.packzip(self.appname+"/"+self.modular+"/controller/"+plug,self.appname+"/"+self.modular+"/controller/"+plug+".zip") + kcwebzip.packzip(self.appname+"/"+self.modular+"/controller/"+plug,self.appname+"/"+self.modular+"/controller/"+plug+".zip") return True,"成功" else: return False,"失败" @@ -947,7 +1020,7 @@ class create: i=0 j+=1 arr=arr['data'] - r=requests.get(arr['dowurl']) + r=requests.get(arr['dowurl'],verify=False) f = open(self.appname+"/"+self.modular+"/controller/"+plug+".zip", "wb") for chunk in r.iter_content(chunk_size=512): if chunk: @@ -963,7 +1036,7 @@ class create: else: return False,self.modular+"模块下找不到"+str(plug)+"插件" if os.path.isfile(self.appname+"/"+self.modular+"/controller/"+plug+".zip"):#安装打包好的插件 - zip.unzip_file(self.appname+"/"+self.modular+"/controller/"+plug+".zip",self.appname+"/"+self.modular+"/controller/"+plug+"/") + kcwebzip.unzip_file(self.appname+"/"+self.modular+"/controller/"+plug+".zip",self.appname+"/"+self.modular+"/controller/"+plug+"/") os.remove(self.appname+"/"+self.modular+"/controller/"+plug+".zip") if os.path.isfile(self.appname+"/"+self.modular+"/controller/"+plug+"/install.txt"): #安装依赖包 install_requires=[] @@ -1044,7 +1117,7 @@ class create: def packmodular(self): """打包模块""" if os.path.exists(self.appname+"/"+self.modular): - zip.packzip(self.appname+"/"+self.modular,self.appname+"/"+self.modular+".zip") + kcwebzip.packzip(self.appname+"/"+self.modular,self.appname+"/"+self.modular+".zip") return True,"成功" else: return False,"失败" @@ -1119,7 +1192,7 @@ class create: if chunk: f.write(chunk) f.close() - zip.unzip_file("./app.zip","./"+self.appname) + kcwebzip.unzip_file("./app.zip","./"+self.appname) os.remove("./app.zip") if not os.path.isfile("./server.py"): if "Windows" in platform.platform(): @@ -1209,7 +1282,7 @@ class create: break i+=1 if os.path.isfile(self.appname+"/"+self.modular+".zip"):#安装打包好的模块 - zip.unzip_file(self.appname+"/"+self.modular+".zip",self.appname+"/"+self.modular+"/") + kcwebzip.unzip_file(self.appname+"/"+self.modular+".zip",self.appname+"/"+self.modular+"/") os.remove(self.appname+"/"+self.modular+".zip") if os.path.isfile(self.appname+"/"+self.modular+"/install.txt"): #安装依赖包 diff --git a/config/__init__.py b/config/__init__.py index 938fc680fc25a6226a87e2b8a24ec4c582f7f4ec..cf527739f29dbd4f823a8fbc2b9ba46cbeea5eb7 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -26,17 +26,18 @@ redis['ex']=0 #过期时间 (秒) #缓存配置 cache={} -cache['type']='File' #驱动方式 支持 File Redis +cache['type']='File' #驱动方式 支持 File Redis Python cache['path']='runtime/cachepath' #缓存保存目录 cache['expire']=120 #缓存有效期 0表示永久缓存 cache['host']=redis['host'] #Redis服务器地址 cache['port']=redis['port'] #Redis 端口 cache['password']=redis['password'] #Redis登录密码 cache['db']=1 #Redis数据库 注:Redis用1或2或3等表示 +cachevalue={} # session配置 session={} -session['type']='File' #session 存储类型 支持 file、Redis +session['type']='File' #session 存储类型 支持 file、Redis、Python session['path']='runtime/session' #session缓存目录 session['expire']=86400 #session默认有效期 该时间是指session在服务的保留时间,通常情况下浏览器上会保留该值的10倍 session['prefix']="KCW" # SESSION 前缀 @@ -100,7 +101,7 @@ email['recNick']='' #默认收件人昵称 kcweb={} kcweb['name']='kcweb' #项目的名称 -kcweb['version']='4.12.5' #项目版本 +kcweb['version']='4.13.18' #项目版本 kcweb['description']='' #项目的简单描述 kcweb['long_description']='' #项目详细描述 kcweb['license']='MIT' #开源协议 mit开源 diff --git a/setup.py b/setup.py index 2b3993ed5ee67838a5e30feecc491a175be3d221..75f1fa3ab1daf88af4095fe3dfd3d70169ca1f36 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,10 @@ -# python setup.py sdist upload +# python3 setup.py sdist upload # twine upload --repository-url https://test.pypi.org/legacy/ dist/* #上传到测试 # pip install --index-url https://pypi.org/simple/ kcweb #安装测试服务上的kcweb pip3 install kcweb==4.12.4 -i https://pypi.org/simple/ ############################################# from setuptools import setup, find_packages,Extension -import os +import os,shutil def file_get_content(k): "获取文件内容" if os.path.isfile(k): @@ -15,10 +15,10 @@ def file_get_content(k): con='' return con confkcw={} -confkcw['name']='kcweb' #项目的名称 -confkcw['version']='4.12.5' #项目版本 +confkcw['name']='kcweb' #项目的名称 +confkcw['version']='4.13.18' #项目版本 confkcw['description']='' #项目的简单描述 -confkcw['long_description']="增加websocket" #项目详细描述 +confkcw['long_description']="增加任务队列进度条" #项目详细描述 confkcw['license']='MIT License' #开源协议 mit开源 confkcw['url']='' confkcw['author']='禄可集团-坤坤' #名字 diff --git a/utill/cache/cache.py b/utill/cache/cache.py index 8df20e7079fabaf114a35273f14c9b8e1c4b483e..357768f16e6fb3fabfd8261c96ce3fd53d2d8059 100644 --- a/utill/cache/cache.py +++ b/utill/cache/cache.py @@ -2,7 +2,7 @@ import os,sys,time,hashlib,json from kcweb import config from kcweb.utill import rediss as red -from kcweb.utill.db.mysql import mysql +# from kcweb.utill.db.mysql import mysql import time,hashlib def md5(strs): @@ -36,20 +36,20 @@ class cache: __config=config.cache __redisobj=None __mysqlobj=None - def __setmysqlonj(self): - conf=config.database - if 'host' in self.__config and self.__config['host']: - conf['host']=[self.__config['host']] - if 'port' in self.__config and self.__config['port']: - conf['port']=[self.__config['port']] - if 'user' in self.__config and self.__config['user']: - conf['user']=[self.__config['user']] - if 'password' in self.__config and self.__config['password']: - conf['password']=[self.__config['password']] - if 'db' in self.__config and self.__config['db']: - conf['db']=[self.__config['db']] - db=mysql() - self.__mysqlobj=db.connect(conf) + # def __setmysqlonj(self): + # conf=config.database + # if 'host' in self.__config and self.__config['host']: + # conf['host']=[self.__config['host']] + # if 'port' in self.__config and self.__config['port']: + # conf['port']=[self.__config['port']] + # if 'user' in self.__config and self.__config['user']: + # conf['user']=[self.__config['user']] + # if 'password' in self.__config and self.__config['password']: + # conf['password']=[self.__config['password']] + # if 'db' in self.__config and self.__config['db']: + # conf['db']=[self.__config['db']] + # db=mysql() + # self.__mysqlobj=db.connect(conf) def __setredisobj(self): "设置redis链接实例" conf=config.redis @@ -132,53 +132,100 @@ class cache: return self.__getrediscache() elif types=='del': return self.__delrediscache() - elif self.__config['type'] == 'MySql': - self.__setmysqlonj() + # elif self.__config['type'] == 'MySql': + # self.__setmysqlonj() + # if types == 'set': + # return self.__setmysqlcache() + # elif types == 'get': + # return self.__getmysqlcache() + # elif types == 'del': + # return self.__delmysqlcache() + elif self.__config['type'] == 'Python': if types == 'set': - return self.__setmysqlcache() + return self.__setpythoncache() elif types == 'get': - return self.__getmysqlcache() + return self.__getpythoncache() elif types == 'del': - return self.__delmysqlcache() + return self.__delpythoncache() else: raise Exception("缓存类型错误") - def __setmysqlcache(self): ######################################################################################## - """设置mysql缓存 + def __setpythoncache(self): + """设置python缓存 return Boolean类型 """ - data=[str(self.__values)] - strs="[" - for k in data: - strs=strs+k - strs=strs+"]" - k=self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).count('id') - self.__setmysqlonj() - if k: - return self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).update({"val":strs,"expire":self.__config['expire'],"time":times()}) - else: - return self.__mysqlobj.table('fanshukeji_core_cache').insert({"name":self.__name,"val":strs,"expire":self.__config['expire'],"time":times()}) - def __getmysqlcache(self): - """获取mysql缓存 + data={ + 'expire':self.__config['expire'], + 'time':times(), + 'values':self.__values + } + config.cachevalue[self.__name]=data + return True + def __getpythoncache(self): + """获取python缓存 return 缓存的值 """ - data=self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).find() - if data : - if data['expire']>0 and times()-data['time']>data['expire']: - self.__setmysqlonj() - self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).delete() - return False - else: - return eval(data['val'])[0] + try: + ar=config.cachevalue[self.__name] + except KeyError: + return "" else: - return False - def __delmysqlcache(self): - """删除mysql缓存 + if ar['expire'] > 0: + if (times()-ar['time']) > ar['expire']: + self.__delpythoncache() + return "" + else: + return ar['values'] + else: + return ar['values'] + def __delpythoncache(self): + """删除python缓存 return Boolean类型 """ - return self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).delete() + try: + del config.cachevalue[self.__name] + except KeyError: + pass + return True + # def __setmysqlcache(self): ######################################################################################## + # """设置mysql缓存 + + # return Boolean类型 + # """ + # data=[str(self.__values)] + # strs="[" + # for k in data: + # strs=strs+k + # strs=strs+"]" + # k=self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).count('id') + # self.__setmysqlonj() + # if k: + # return self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).update({"val":strs,"expire":self.__config['expire'],"time":times()}) + # else: + # return self.__mysqlobj.table('fanshukeji_core_cache').insert({"name":self.__name,"val":strs,"expire":self.__config['expire'],"time":times()}) + # def __getmysqlcache(self): + # """获取mysql缓存 + + # return 缓存的值 + # """ + # data=self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).find() + # if data : + # if data['expire']>0 and times()-data['time']>data['expire']: + # self.__setmysqlonj() + # self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).delete() + # return False + # else: + # return eval(data['val'])[0] + # else: + # return False + # def __delmysqlcache(self): + # """删除mysql缓存 + + # return Boolean类型 + # """ + # return self.__mysqlobj.table('fanshukeji_core_cache').where("name",self.__name).delete() def __setrediscache(self): """设置redis缓存 diff --git a/utill/db/mysql.py b/utill/db/mysql.py index 905706420082335ae84d5524aa6617ce2a8341b8..1c3558b2de3c1cb03721440efa5f10362d45e7ec 100644 --- a/utill/db/mysql.py +++ b/utill/db/mysql.py @@ -2,8 +2,20 @@ from .pymysql import connect,escape_string # import config.conf as config import kcweb.config as config -import time,traceback,decimal,random,copy +import time,traceback,decimal,random,copy,hashlib +from kcweb.utill.cache import cache as kcwcache +mycache=copy.deepcopy(config.cache) +mycache['type']='Python' + dbconfig=config.database +def md5(strs): + """md5加密""" + if not strs: + return strs + m = hashlib.md5() + b = strs.encode(encoding='utf-8') + m.update(b) + return m.hexdigest() class mysql: """数据库实例""" __config=dbconfig @@ -304,6 +316,82 @@ class mysql: self.__table=table return self __patternerrorcount=0 # 长连接情况下 错误次数 + def __setcursor(self,typess='DQL'): + """设置游标 + + 参数 type :数据查询语言DQL,数据操纵语言DML,数据定义语言DDL,数据控制语言DCL + """ + self.__setdbcount() + # self.__DBsingle=self.__config['pattern'] + self.__dbObjcount=self.__config['dbObjcount'] + self.__errorcount=self.__config['break'] + self.__connects() + bs='' + try: + if self.__config['deploy']==0: # 集中式(单一服务器) + if self.__config['pattern']: # 长连接情况下 + bs=self.__masteridentifier + # types='master' + self.__dbobjident=random.randint(0, self.__dbObjcount-1) + self.__cursor=self.__conn[self.__masteridentifier][self.__dbobjident]['obj'].cursor() #获取随机数据库链接实例 + else: + self.__cursor=self.__conn[self.__masteridentifier].cursor() #获取随机数据库链接实例 + elif self.__config['deploy']==1: # 分布式(主从服务器) + if self.__config['pattern']: # 长连接情况下 + bs=self.__masteridentifier + slavecount=self.__dbcount-self.__config['master_num'] #从服务器数量 + # types='master' + if typess=='DQL': #数据查询语言DQL + if self.__config['master_dql']: #主服务器可以执行DQL + if slavecount: + if random.randint(0,1): + bs=self.__masteridentifier + # types='master' + self.__dbobjident=random.randint(0,self.__dbObjcount*self.__config['master_num']-1) + self.__cursor=self.__conn[self.__masteridentifier][self.__dbobjident]['obj'].cursor() #获取随机数据库链接实例 主服务器 + else: + bs=self.__slaveidentifier + # types='slave' + self.__dbobjident=random.randint(0,self.__dbObjcount*slavecount-1) + self.__cursor=self.__conn[self.__slaveidentifier][self.__dbobjident]['obj'].cursor() #获取随机数据库链接实例 从服务器 + else: + bs=self.__masteridentifier + # types='master' + self.__dbobjident=random.randint(0,self.__dbObjcount*self.__config['master_num']-1) + self.__cursor=self.__conn[self.__masteridentifier][self.__dbobjident]['obj'].cursor() #获取随机数据库链接实例 主服务器 + else: + bs=self.__slaveidentifier + # types='slave' + self.__dbobjident=random.randint(0,self.__dbObjcount*slavecount-1) + self.__cursor=self.__conn[self.__slaveidentifier][self.__dbobjident]['obj'].cursor() #获取随机数据库链接实例 从服务器 + else: #数据操纵语言DML,数据定义语言DDL,数据控制语言DCL + bs=self.__masteridentifier + # types='master' + self.__dbobjident=random.randint(0,self.__dbObjcount*self.__config['master_num']-1) + self.__cursor=self.__conn[self.__masteridentifier][self.__dbobjident]['obj'].cursor() #获取随机数据库链接实例 主服务器 + self.__conn[bs][self.__dbobjident]['error']=0 + else: + self.__cursor=self.__conn[self.__masteridentifier].cursor() + except Exception as e: # 铺获未知异常 + raise Exception(e) + else: + self.__patternerrorcount=0 + return self.__cursor + + def get_exec_cursor(self): + """获取执行游标(不推荐使用) + + 参数 type :数据查询语言DQL,数据操纵语言DML,数据定义语言DDL,数据控制语言DCL + """ + cursor=self.__setcursor("DML") + return cursor + def get_query_cursor(self): + """获取查询游标(不推荐使用) + + 参数 type :数据查询语言DQL,数据操纵语言DML,数据定义语言DDL,数据控制语言DCL + """ + cursor=self.__setcursor("DQL") + return cursor def __execute(self,typess='DQL'): """执行sql语句 @@ -478,8 +566,6 @@ class mysql: i=i+1 lists.append(dicts) return lists - - def select(self,id=None): """select查询 @@ -492,12 +578,16 @@ class mysql: self.__sqls="("+self.__sql+")" self.__None() return self.__sqls - + if self.__cache and self.__cache[0]: + cou=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if cou: + self.__None() + return cou self.__execute() description=self.__cursor.description #获取字段 result = self.__cursor.fetchall() #获取查询结果 self.__cursor.close() - self.__None() + # self.__None() lists=[] keys =[] for field in description:#获取字段 @@ -511,6 +601,9 @@ class mysql: k=float(row[q]) item[keys[q]] = k lists.append(item) + if lists and self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),lists,self.__cache[1]) + self.__None() return lists def find(self,id=None): """查询一条记录 @@ -525,11 +618,16 @@ class mysql: self.__sqls="("+self.__sql+")" self.__None() return self.__sqls + if self.__cache and self.__cache[0]: + item=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if item: + self.__None() + return item self.__execute() description=self.__cursor.description #获取字段 result = self.__cursor.fetchall() #获取查询结果 self.__cursor.close() - self.__None() + # self.__None() item = dict() keys =[] for field in description:#获取字段 @@ -541,8 +639,38 @@ class mysql: if type(row[q])==decimal.Decimal: k=float(row[q]) item[keys[q]] = k + if item and self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),item,self.__cache[1]) + self.__None() return item - + def value(self,field): + """查询指定字段值 + + 返回 一个字段 + """ + self.__field=field + self.limit(1) + self.__setsql() + if self.__buildSql: + self.__sqls="("+self.__sql+")" + self.__None() + return self.__sqls + if self.__cache and self.__cache[0]: + cou=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if cou: + self.__None() + return cou + self.__execute() + result = self.__cursor.fetchall() #获取查询结果 + self.__cursor.close() + # self.__None() + strs='' + if result: + strs=result[0][0] + if strs and self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),strs,self.__cache[1]) + self.__None() + return strs def count(self,field="*"): """查询数量 @@ -554,6 +682,11 @@ class mysql: self.__sqls="("+self.__sql+")" self.__None() return self.__sql + if self.__cache and self.__cache[0]: + cou=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if cou: + self.__None() + return cou self.__execute() result = self.__cursor.fetchall() #获取查询结果 self.__cursor.close() @@ -565,6 +698,9 @@ class mysql: except IndexError: cou=0 # self.__None() + if cou and self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),cou,self.__cache[1]) + self.__None() return cou def max(self,field): """查询某字段的最大值 @@ -577,10 +713,18 @@ class mysql: self.__sqls="("+self.__sql+")" self.__None() return self.__sql + if self.__cache and self.__cache[0]: + cou=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if cou: + self.__None() + return cou self.__execute() result = self.__cursor.fetchall() #获取查询结果 self.__cursor.close() cou=int(result[0][0]) + # self.__None() + if cou and self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),cou,self.__cache[1]) self.__None() return cou def min(self,field): @@ -594,11 +738,19 @@ class mysql: self.__sqls="("+self.__sql+")" self.__None() return self.__sql + if self.__cache and self.__cache[0]: + cou=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if cou: + self.__None() + return cou self.__execute() result = self.__cursor.fetchall() #获取查询结果 self.__cursor.close() - self.__None() + # self.__None() cou=int(result[0][0]) + if cou and self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),cou,self.__cache[1]) + self.__None() return cou def avg(self,field): """查询某字段的平均值 @@ -611,11 +763,19 @@ class mysql: self.__sqls="("+self.__sql+")" self.__None() return self.__sql + if self.__cache and self.__cache[0]: + cou=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if cou: + self.__None() + return cou self.__execute() result = self.__cursor.fetchall() #获取查询结果 self.__cursor.close() - self.__None() + # self.__None() cou=int(result[0][0]) + if cou and self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),cou,self.__cache[1]) + self.__None() return cou def sum(self,field): """查询某字段之和 @@ -628,13 +788,22 @@ class mysql: self.__sqls="("+self.__sql+")" self.__None() return self.__sql + if self.__cache and self.__cache[0]: + cou=kcwcache.cache().set_config(mycache).get_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0]))) + if cou: + self.__None() + return cou self.__execute() result = self.__cursor.fetchall() #获取查询结果 self.__cursor.close() - self.__None() + # self.__None() cou=result[0][0] if not cou: cou=0 + else: + if self.__cache and self.__cache[0]: + kcwcache.cache().set_config(mycache).set_cache(md5(self.__sql+str(self.__config['db'][0])+str(self.__config['host'][0])),cou,self.__cache[1]) + self.__None() return cou def setinc(self,field,key=1,affair=False): @@ -764,6 +933,7 @@ class mysql: self.__table=None self.__buildSql=None self.__table=None + self.__cache=None __where=None __wheres=() @@ -921,16 +1091,15 @@ class mysql: self.__lock=strs return self - # __cache=[] - # def cache(self,endtime,tag=None): - # """设置查询缓存 + __cache=[] + def cache(self,status=None,endtime=86400): + """设置查询缓存 + status 缓存开关 - # 参数 endtime:int 缓存数据 0永久 - - # 参数 tag:int 缓存标签 - # """ - # self.__cache=[endtime,tag] - # return self + 参数 endtime:int 缓存时间 0永久 + """ + self.__cache=[status,endtime] + return self def __setsql(self,types=None,data = {}): """生成sql语句""" if types==None: diff --git a/utill/queues.py b/utill/queues.py index 40dd467f7875b7a4e29bc536cf2373abb7b662fe..ae6b8c090d97c4f6e2ef074fde13d643a47c2199 100644 --- a/utill/queues.py +++ b/utill/queues.py @@ -1,7 +1,7 @@ from queue import Queue from .db import model from .db import sqlite as kcwsqlite -import threading,time,os,hashlib,random +import threading,time,os,hashlib,random,traceback,multiprocessing queuesdbpath=os.path.split(os.path.realpath(__file__))[0]+"/Queues" class model_kcweb_task(model.model): "任务" @@ -16,11 +16,17 @@ class model_kcweb_task(model.model): "code":model.dbtype.int(LEN=11,DEFAULT=2), #状态码 0成功 1失败 2等待中 3正在执行 4完成 "msg":model.dbtype.text(), #状态描述 "error":model.dbtype.text(), #异常信息 - "addtime":model.dbtype.int(LEN=11,DEFAULT=0) #添加时间 + "start":model.dbtype.varchar(LEN=11,DEFAULT=0), #进度条起始值 + "end":model.dbtype.int(LEN=11,DEFAULT=100), #进度条结束值 + "starts":model.dbtype.varchar(LEN=11,DEFAULT=0), #每秒钟进度条起始值增加多少 + "addtime":model.dbtype.int(LEN=11,DEFAULT=0), #添加时间 + "endtime":model.dbtype.int(LEN=11,DEFAULT=0), #结束时间 + "updtime":model.dbtype.int(LEN=11,DEFAULT=0) #更新时间 } class Queues(): __globalqueue=None - def start(): + __processglobalqueue=None + def __start(): if not Queues.__globalqueue: if not os.path.isfile(queuesdbpath): model_kcweb_tasks=model_kcweb_task() @@ -34,52 +40,146 @@ class Queues(): t=threading.Thread(target=Queues.__messagequeue) t.daemon=True t.start() + def __startprocess(): + if not Queues.__processglobalqueue: + if not os.path.isfile(queuesdbpath): + model_kcweb_tasks=model_kcweb_task() + model_kcweb_tasks.create_table() + try: + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").find() + except: + model_kcweb_tasks=model_kcweb_task() + model_kcweb_tasks.create_table() + Queues.__processglobalqueue=multiprocessing.Queue() + t=multiprocessing.Process(target=Queues.messagequeueprocess,args=(Queues.__processglobalqueue,)) + t.daemon=True + t.start() + def messagequeueprocess(processglobalqueue): + """不支持外部调用""" + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("code=2 and updtime<"+str(int(time.time())-86400*7)).delete() + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("code=3 and updtime<"+str(int(time.time())-1200)).delete() + while True: + if not processglobalqueue.empty(): + value=processglobalqueue.get() + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":3,"error":""}) + if value['args']: + try: + value['target'](*value['args']) + except: + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":1,'endtime':int(time.time()),'updtime':int(time.time()),"error":str(traceback.format_exc())}) + else: + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":4,"start":100,'endtime':int(time.time()),'updtime':int(time.time())}) + else: + try: + value['target']() + except: + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":1,'endtime':int(time.time()),'updtime':int(time.time()),"error":str(traceback.format_exc())}) + else: + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":4,"start":100,'endtime':int(time.time()),'updtime':int(time.time())}) + else: + time.sleep(1) def __messagequeue(): # time.sleep(0.01) - kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("code","in",2,3).delete() + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("code=2 and updtime<"+str(int(time.time())-86400*7)).delete() + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("code=3 and updtime<"+str(int(time.time())-1200)).delete() while True: if not Queues.__globalqueue.empty(): value=Queues.__globalqueue.get() - kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":3,"msg":"正在执行","error":""}) + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":3,"error":""}) if value['args']: try: value['target'](*value['args']) - except Exception as e: - kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":1,"msg":"失败","error":str(e)}) + except: + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":1,'endtime':int(time.time()),'updtime':int(time.time()),"error":str(traceback.format_exc())}) else: - kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":4,"msg":"执行完成"}) + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":4,"start":100,'endtime':int(time.time()),'updtime':int(time.time())}) else: try: value['target']() - except Exception as e: - kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":1,"msg":"失败","error":str(e)}) + except: + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":1,'endtime':int(time.time()),'updtime':int(time.time()),"error":str(traceback.format_exc())}) else: - kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":4,"msg":"执行完成"}) + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid = '"+value['task']['taskid']+"' and code!=4").update({"code":4,"start":100,'endtime':int(time.time()),'updtime':int(time.time())}) else: time.sleep(1) - def insert(target,args=None,title="默认任务",describes="",msg='等待中',taskid=None): #add_queue + def delhist(): + """清除任务历史记录(包括 成功的 失败的 已完成的)""" + return kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("code","in",0,1,4).delete() + def delwhere(where): + "通过where条件删除 (不推荐使用)" + return kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where(where).delete() + def setfield(taskid,key,value): + """设置指定字段(不建议使用)""" + try: + return kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid",taskid).update({key:value}) + except: + return False + def setstart(taskid,start=0.001,describes=None): + """增加进度条起始位置 + + start 支持0.001到10 + """ + if start>=0.001 and start<=10: + arr=kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid",taskid).find() + if arr: + start+=round(float(arr['start']),3) + if start>=0.01 and start <=99.99: + dqsjc=int(time.time())-arr['updtime'] #当前时间差 + if dqsjc>=1: + starts=round(float((float(start)-float(arr['start']))/dqsjc),3) + upddate={"start":start,'starts':starts,'updtime':int(time.time())} + if describes: + upddate['describes']=describes + kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid",taskid).update(upddate) + return True + else: + return False + else: + return False + else: + False + def insert(target,args=None,title="默认任务",describes='',msg='',taskid=None,start=0,updtime=0,types=''): #add_queue """添加队列 target 方法名 必须 - args 方法参数 非必须 如 + args 方法参数 非必须 如 (参数1,参数2) title 任务名称 describes 任务描述 + msg 状态描述 + + taskid 任务id + + start 进度条开始位置 (建议1到50) + + updtime 进度条更新时间 + + types process表示使用进程执行 否则使用线程执行 + return taskid """ - Queues.start() + if types=='process': + Queues.__startprocess() + else: + Queues.__start() ttt=int(time.time()) if not taskid: m = hashlib.md5() m.update((str(ttt)+str(random.randint(100000,999999))).encode(encoding='utf-8')) taskid=m.hexdigest() - task={"taskid":taskid,"title":title,"describes":describes,"code":2,"msg":msg,"error":"","addtime":ttt} + end=100 + if not updtime: + updtime=ttt + task={"taskid":taskid,"title":title,"describes":describes,"code":2,"msg":msg,"error":"","start":start,"end":end,"addtime":ttt,'starts':0,'endtime':0,'updtime':updtime} key={"target":target,"args":args,"task":task} kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").insert(task) - Queues.__globalqueue.put(key) + if types=='process': + Queues.__processglobalqueue.put(key) + else: + Queues.__globalqueue.put(key) return taskid def getall(pagenow=1,pagesize=20,where=None): """获取全部队列 @@ -106,7 +206,18 @@ class Queues(): return dict """ - return kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").field("taskid,title,describes,code,msg,error,addtime").where("taskid",taskid).find() + arr=kcwsqlite.sqlite().connect(queuesdbpath).table("model_kcweb_queues").where("taskid",taskid).find() + if arr: + arr['start']=float(arr['start']) + dqsjc=arr['updtime']-arr['addtime'] + if dqsjc and arr['start']>=1: + arr['starts']=round(float(arr['start'])/dqsjc,3) + else: + try: + arr['starts']=round(float(arr['starts']),3) + except: + arr['starts']=0 + return arr def iscomplete(): """判断对列中的任务是否全部执行完成