# python_high_level_grammer **Repository Path**: lihaowen2017/python_high_level_grammer ## Basic Information - **Project Name**: python_high_level_grammer - **Description**: python高级语法笔记,包括基础概念,魔术方法,元类编程,异步编程,原子操作等 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 2 - **Created**: 2020-02-08 - **Last Updated**: 2023-03-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # python 高级语法 ## 问题 1. from collections import abc 跟不进去源码 2. 登录功能能否建立hash索引,hash索引空间消耗量有多大 ## python中一切皆对象 ### type, object , class 关系 1. type继承object 2. object是由type实例化的 3. type可以实例化自己 4. python中所有类继承object 5. python中所有都为对象,例如int类是由type实例化的。type是顶层实例化的 ### python的内置类型 对象的三个特征: 身份:对象在内存的地址 id(对象) 查看内存地址 类型:None(全局只有一个)内存地址唯一;数值;迭代类型;序列类型(list,tuple等);映射类型(dict);集合(set, frozenset);上下文管理类型(with);其它(模块类型, class,实例) 值: ### 魔法函数 python 中魔法函数是定义好的。 非数学运算: 1. 字符串的表述 数学运算 ### python的数据模型以及数据模型对python的影响 python 一切皆对象, 魔法方法给予一个类额外的信息 如使一个自定义类的实例化对象变为可迭代对象 ```python def __getitem__(self, item): # 变为可迭代类型 return self.employee[item] # employee序列 ``` ### 鸭子类型 python多态性,不同类实现相同的方法,体现了鸭子类型。 python的魔法函数也体现了鸭子类型。 魔法函数表明了多态性。 类中实现了哪些方法就是哪种类型。 基于协议 ### 抽象基类 1. 在基础类中我们设定好一些方法,所有继承该类的类都必须覆盖这些方法 2. 抽象基类无法实例化 #### 抽象基类的两个应用场景 1. 我们去检查某个类是否有某种方法 ```python class Company(object): def __init__(self, employee_list): self.employee = employee_list # def __getitem__(self, item): # # 变为可迭代类型 # return self.employee[item] def __len__(self): return len(self.employee) company = Company(["tom", "bob", "jane"]) print(hasattr(company, "__len__")) # 判断对象是否有某属性 print(len(company)) # 我们在某些情况之下希望判定某个对象的类型 from collections.abc import Sized # 具有__len__魔法方法 print(isinstance(company, Sized)) ``` 2. 我们需要强制某个子类必须实现某些方法 ```python # 如何取模拟一个抽象基类 class CacheBase(): def get(self, key): raise NotImplementedError def set(self, key, value): raise NotImplementedError class RedisCache(CacheBase): pass # 实例化子类 redis_cache = RedisCache() # 然而只有在调用set方法时才会报错,无法在初始化时直接报错 redis_cache.set("key", "value") ``` ```python # 子类未覆盖父类方法则初始化时直接报错 import abc class CacheBase(metaclass=abc.ABCMeta): @abc.abstractmethod def get(self, key): pass @abc.abstractmethod def set(self, key, value): pass class RedisCache(CacheBase): pass redis_cache = RedisCache() # 初始化直接报错 # TypeError: Can't instantiate abstract class RedisCache with abstract methods get, set ``` 一般不使用抽象基类来实现接口编写,一般使用鸭子类型的多态特性。 ### isinstance 和 type的区别;is和==的区别 ```python class A: pass class B(A): pass b = B() print(isinstance(b, B)) # isinstance判断是否属于继承链 print(isinstance(b, A)) # is 判断id是否相同, 是否为同一对象 # type判断可能存在误差, A也为对象 id(A) 不等于 id(B) 所以type无法判断继承的类型 print(type(b) is B) print(id(type(b))) print(id(B)) # 1811760003272 # 1811760003272 # == 值是否相等 # print(type(b) == B) ``` ### 单例 ```python # 一个类只能创建一个对象 # 字典记录是否已经创建过对象 from functools import wraps def singleton(cls): instances = {} lock = threading.RLock() # 线程锁 @wraps def wrapper(*args, **kwargs): if cls not in instances: with lock: if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return wrapper ``` ### 类变量和对象变量 类属性属于类,实例属性属于实例由 self.变量 = 值 确定 实例的属性会向上查找到类的变量,而类的变量无法向下查询到实例的变量 ```python class A: aa = 1 # 类变量 def __init__(self, x, y): # 构造函数 self.x = x # self:实例;x属于对象 self.y = y a = A(2, 3) b = A(6, 7) A.aa = 11 # 修改类的属性 a.aa = 100 # 实例.属性 = 值 ,赋值给实例;此时实例多了一个属性aa;对象上新建了一个aa=100的属性 print(a.x, a.y, a.aa) # 向上查找,实例中没有变量会查找它的类。 print(A.aa) # 11 # print(A.x) # 类变量不能向下查找,不能查找类的实例 print(b.aa) # 11 ``` ### 类属性和实例属性以及查找顺序 MRO 查找算法 C3 线性算法 ### 静态方法、类方法以及对象方法以及参数 1. 实例方法由对象调用,应用于对象变量及类变量等的修改及其他业务场景。 2. 类方法由类进行调用,应用场景往往是实例化对象传入数据的预处理,返回对象的实例化。 3. 静态方法由类进行调用,应用场景是对传入数据进行判断,返回值往往是正误。 ```python class Date: # 构造函数 def __init__(self, year, month, day): self.year = year self.month = month self.day = day # 实例方法 def tomorrow(self): self.day += 1 # 类变量修改 Date.day += 1 # 静态方法:数据预处理逻辑可以放入 # 静态的弊端,硬编码,即类实例化为同名,如果类名字发生改变,则实例化名称需要改变 @staticmethod def parse_from_string(date_str): year, month, day = tuple(date_str.split("-")) # tuple拆包方法 return Date(int(year), int(month), int(day)) @staticmethod def valid_str(date_str): year, month, day = tuple(date_str.split("-")) # 判断时间日期是否合规 if int(year) > 0 and (0 < int(month) <= 12) and (0 < int(day) <= 31): return True else: return False # 类方法 参数:类本身 @classmethod def from_string(cls, date_str): year, month, day = tuple(date_str.split("-")) return cls(int(year), int(month), int(day)) def __str__(self): return "{year}/{month}/{day}".format(year=self.year, month=self.month, day=self.day) if __name__ == '__main__': new_day = Date(2018, 12, 29) new_day.tomorrow() # 实例方法 print(new_day) # 2018-12-31 # 外部处理数据传递给实例化对象 date_str = "2018-12-30" year, month, day = tuple(date_str.split("-")) # tuple拆包方法 new_day = Date(int(year), int(month), int(day)) new_day.tomorrow() # 实例方法 可以这样理解tomorrow(new_day) print(new_day) # 用staticmethod完成初始化 # 静态方法属于Date的作用域 date_str = "2018-12-15" new_day = Date.parse_from_string(date_str) # 静态方法使用 print(new_day) # 用classmethod完成初始化 date_str = "2018-12-10" new_day = Date.from_string(date_str) print(new_day) # staticmethod 使用场景 对预处理数据的校验无需返回数据 print(Date.valid_str("2018-10-33")) ``` ### 数据封装和私有属性 私有属性的访问只能在类中的公共方法进行访问,无法通过实例进行访问 ```python from advanced.chapter03.class_method import Date class User: def __init__(self, birthday): self.__birthday = birthday def get_age(self): # 返回年龄 return 2019 - self.__birthday.year class Student(User): def __init__(self, birthday): self.__birthday = birthday if __name__ == '__main__': user = User(Date(1990, 2, 1)) # print(user.__birthday) print(user._User__birthday) # 私有属性只是将属性名修改,属性修改规则还可以避免同名属性 print(user.get_age()) ``` ### python 对象的自省机制 自省是通过一定的机制查询到对象的内部结构 ```python # 例如:下列方法均可查询内部结构 dir() __dict__ ``` ### super函数 ```python # 既然我们重写了B的构造函数, 为什么还要去调用super,减少代码量,实现代码较好的复用 # super到底执行顺序是什么样的 根据MRO调用super ``` ### mixin 模式继承 mixin模式特点: 类比DRF代码 1. mixin类功能单一 2. 不和基类关联,可以和任意基类组合,基类可以不和mixin关联就能初始化成功 3. 在mixin中不要使用super用法 ### try/except/else/finally ```python def exe_try(): try: print("code 1") raise KeyError return 1 except KeyError as e: print("key error") return 2 else: print("other error") return 3 finally: # 一般用于资源释放 print("finally") return 4 if __name__ == '__main__': result = exe_try() print(result) # 执行过程 # else执行的前提是 try没有报错 # finally 是一定最后执行的 # 如果加入return return 1先入栈,return 2 再压入栈, return 4 最后压入栈;finally执行后,弹出栈所以return 4 ``` ### python中的上下文管理器with 具有__enter__和__exit__两个魔法方法的对象就可以使用with的上下文管理协议 ```python # 上下文管理器协议 class Sample(object): def __enter__(self): print("enter") # 进入with时候调用 # 获取资源 return self def __exit__(self, exc_type, exc_val, exc_tb): # 释放资源 print("exit") # 跳出with后自动调用 def do_something(self): print("doing something") with Sample() as sample: sample.do_something() ``` ### contextlib简化上下文管理器 ```python import contextlib @contextlib.contextmanager # 将一个函数变成一个上下文管理器,函数必须是生成器 def file_open(file_name): print("file open") # 相当于def __enter__(self):里的代码 yield{} print("file end") # 相当于def __exit__(self):里的代码 with file_open("li.txt") as f_opened: print("file processing") ``` ```python # 使用场景 # 上下文装饰器,递归中使用 from contextlib import contextmanager from time import time @ contextmanager def record_time(): start = time() yield 'hello' end = time() print(f'执行时间;{end - start}秒') def fac(num): if num in(0, 1): return 1 return num * fac(num - 1) with record_time() as msg: print(msg) print(fac(900)) ``` ## 自定义序列类 ### 序列类的分类 1. 容器序列:list,tuple,deque 容器序列可以不同类型的数据 2. 扁平序列:str,bytes,bytearray,array.array 扁平序列中的数据类型一致 3. 可变序列:list,deque,bytearray,array 4. 不可变序列:str,tuple,bytes ### 序列的 +,+= 和extend的区别 ```python my_list = [] my_list.append(1) my_list.append("a") from collections import abc # 跟容器相关的数据结构的抽象基类 a = [1, 2, 3] c = a + [3, 4] # + 两边类型必须一致 print(c) # += 就地加 可以整合任意序列类型 a += (3, 4) # 修改原本的list print(a) a.extend(range(3)) # 修改原本的list 可以整合任意序列类型 ``` ### 切片操作回顾 ```python # 模式[start:end:step] """ 其中,第一个数字start表示切片开始位置,默认为0; 第二个数字表示切片截止(但不包含)位置(默认为列表长度); 第三个数字step表示切片的步长(默认为1); 当start为0时可以省略,当end为列表长度时可以省略; 当step为1时可以省略,并且省略步长时可以同时省略最后一个冒号; 另外,当step为负整数时,表示反向切片,当时start应该比end的值要大才行 切片操作返回一个新列表 """ aList = [3, 4, 5, 6, 7, 8, 9, 11, 13, 15, 17] print(aList[::]) # 返回包含原列表中所有元素的新列表 print(aList[::-1]) # 返回包含原列表中所有元素的逆序列表 print(aList[::2]) # 隔一个取一个,获取下标为偶数位置的元素 print(aList[1::2]) # 隔一个取一个, 获取下标为奇数位置的元素 print(aList[3:6]) # 指定切片的开始和结束位置 print(aList[0:100]) # 切片结束位置大于列表长度时,从列表尾部截断 print(aList[100:]) # 切片开始位置大于列表长度时,返回空列表 aList[len(aList):] = [9] # 在列表尾部增加元素 aList[:0] = [1, 2] # 在列表表头插入元素 aList[3:3] = [4] # 在列表下标为3的位置插入元素 aList[:3] = [1, 2] # 替换列表元素,等号两边的列表长度相等 aList[3:] = [4, 5, 6] # 等号两边的列表长度也可以不相等 aList[::2] = [0] * 6 # 隔一个修改一个,等号两边长度一致 aList[::2] = ['a', 'b', 'c', 'd', 'e', 'f'] # 隔一个修改一个,等号两边长度一致 aList[:3] = [] # 删除列表中前三个元素 del aList[:3] # 删除连续元素 del aList[::2] # 隔一个删一个 ``` ### 实现可切片对象 ```python import numbers class Group: # 支持切片操作 def __init__(self, group_name, company_name, staffs): self.group_name = group_name self.company_name = company_name self.staffs = staffs def __reversed__(self): """数据反转""" self.staffs.reverse() def __getitem__(self, item): """变为可切片的对象""" # return self.staffs[item] 返回值类型为list 需求要仍为Group类型 cls = type(self) # 取到实例的类 if isinstance(item, slice): return cls(group_name=self.group_name, company_name=self.company_name, staffs=self.staffs[item]) elif isinstance(item, numbers.Integral): return cls(group_name=self.group_name, company_name=self.company_name, staffs=[self.staffs[item]]) # 变为数组 def __len__(self): return len(self.staffs) def __iter__(self): return iter(self.staffs) def __contains__(self, item): """ if in 判断使用""" if item in self.staffs: return True else: return False staffs = ["a1", "b1", "c1"] group = Group(company_name="znufe", group_name="user", staffs=staffs) sub_group = group[:2] test_group = group[0] print(len(group)) if "a1" in group: print("yes") reversed(group) ``` ### bisect维护已排序的序列(二分查找python库) ```python import bisect # 用来处理已排序的序列,用来维持已排序的序列, 升序 # 二分查找 inter_list = [] # 任何可修改的序列 deque bisect.insort(inter_list, 3) # 插入 bisect.insort(inter_list, 4) bisect.insort(inter_list, 1) bisect.insort(inter_list, 2) bisect.insort(inter_list, 6) print(bisect.bisect_left(inter_list, 3)) # 查找应该插入的位置,相同元素之前, bisect_right,相同元素之后 print(inter_list) ``` ### 什么时候我们不应该使用列表 array, deque array: 数组 使用场景: 例如数据过滤 ```python # 数组 import array # array和list的一个重要区别,array只能存放指定的数据类型 my_array = array.array("i") my_array.append(1) my_array.append(2) print(my_array) ``` ### map reduce filter 函数 ### Django中间件写法 ### 列表推导式、生成器表达式、字典推导式 ```python # 列表生成式(列表生成式) # 1. 提取出1-20之间的奇数 # res = [x for x in range(1, 21) if x % 2] # 2. 逻辑复杂的情况 求1-20中奇数平方 def hanle_item(item): return item * item res = [x ** 2 for x in range(1, 21) if x % 2] # 列表生成式性能高于列表操作 # 生成器表达式 res_gen = (x ** 2 for x in range(1, 21) if x % 2) print(type(res_gen)) # for i in res_gen: # 生成器获取数据 # print(i) res_list = list(res_gen) # list 接受可迭代对象 print(res_list) # 字典推导式 my_dict = {"a1": 23, "b1": 24, "c1": 5, "d1": 23} print(my_dict.items()) # key value 颠倒 reversed_dict = {value:key for key, value in my_dict.items()} print(reversed_dict) # 集合推导式 my_set = {value for key, value in my_dict.items()} print(type(my_set)) print(my_set) # my_set_key = set(my_dict.keys()) # dict 接口调用 # print(my_set_key) ``` ### dict常用方法 ```python a = {"a1": {"company": "znufe"}, "b1": {"company": "qf"}} # clear # a.clear() # pass # copy,返回浅拷贝 只拷贝一层; 深拷贝,将子对象也拷贝 # import copy # new_dict = copy.deepcopy(a) # new_dict["a1"]["company"] = "c3" # pass # fromkeys 可迭代对象转换为字典 new_list = ["b1", "b2"] new_dict = dict.fromkeys(new_list, {"company": "znufe"}) print(new_dict) val = new_dict.get("b", {}) print(val) print(new_dict.items()) for key, value in new_dict.items(): for i, j in new_dict[key].items(): print(i, j) default_val = new_dict.setdefault("b3", {"company": "imooc"}) # 添加数据,效率较高 print(new_dict) new_dict.update(b4="i1", b5="i2") # update 接受可迭代对象更新原有字典 new_dict.update([("b6","i4")]) print(new_dict) ``` ### dict 的子类 ```python # 不推荐直接继承dict 和list # 原因,重写某些方法时,由于底层是c实现的会导致某些方法失效 # 可以继承 UserDict # class Mydict(dict): # def __setitem__(self, key, value): # super(Mydict, self).__setitem__(key, value*2) # # # my_dict = Mydict(one=1) # 重写方法失效 # print(my_dict) # my_dict["one"] = 1 # print(my_dict) # from collections import UserDict # 重写dict 应该继承UserDict # # class NewMydict(UserDict): # def __setitem__(self, key, value): # super(NewMydict, self).__setitem__(key, value*2) # # new_mydict = NewMydict(one=1) # print(new_mydict) from collections import defaultdict mydict = defaultdict(dict) myval = mydict["b1"] # 当找不到相应key时,会进入defaultdict的__missing__方法,实现模拟的defaultdict实现设定默认值 print(myval) ``` ### 集合的使用 ```python # set 集合 fronzenset(不可变集合) 无序,不重复 s = set("abbcde") # 接受一个可迭代的对象 s.add("f") print(s) s1 = frozenset("abcde") # frozenset可以作为dict的key # 向set添加数据 another_set = set("defg") s.update(another_set) # 更新数据 print(s) re_set = s.difference(another_set) # 差集,拥有返回值返回一个新set # set 性能高 # | & - # 集合运算 re_set = s - another_set re_set = s & another_set re_set = s | another_set print(s.issubset(re_set)) # 判断是否为子集 print(re_set) ``` ### 集合、字典效率问题 ```python # dict查找性能远远大于list # 在list中随着list数据轭增大,查找时间会增大 # 在dict中查找元素不会随着dict的增大而增大 # 由于哈希表的使用使dict和set的查找性能都远远高于list # 哈希计算偏移量形成哈希表存入数组内存,数组可以直接存取,不需要遍历 # 哈希冲突解决方法 # "abc" -> ("c" + 随机数) -> hash # 还冲突 # "abc" -> ("bc" + 随机数) -> hash # 往前加一位 计算散列值 # 去重时首选set # dict的key或者set的值都必须是可hash的 # 不可变的对象都是可hash的, __str__, fronzenset, tuple, 自己实现的类 __hash__ # dict 的内存花销大,但是查询速度快,自己定义的对象 或者python内存的对象都是用dict包装的 # dict存储顺序与元素添加顺序有关 python 内部实现了order_dict,尽量不要期望dict的顺序性 # 添加顺序可能改变已有数据的顺序,重新申请内存空间的时候改变数据映射顺序。 ``` ### 对象引用、可变性和垃圾回收 #### python 变量的本质 1. python变量的本质为一个指向内存地址的指针,本身无类型;类似于一个贴在有类型地址上的便利贴 ```python a = 1 a = "abc" # 1. a贴在1上面 # 2. 先生成对象,然后贴便利贴 a = [1, 2, 3] b = a b.append(4) # 操作b也在操作a b和a作为指针,指向同一块儿内存地址 print(a is b) # 是否是同一个对象 print(id(a), id(b)) # id相同,为同一个对象 ``` ### == 和 is的区别 ```python a = [1, 2, 3] b = a b.append(4) print(a is b) # 是否是同一个对象 print(id(a), id(b)) # id相同,为同一个对象 c = [1, 2, 3, 4] d = [1, 2, 3, 4] print(c is d) # False print(id(c), id(d)) print(c == d) # True e = 1 # intern 机制 对于小的整数和字符串 python内部进行优化创建全局唯一的对象,之后再次创建小对象会直接指向之前建立的对象 f = 1 print(id(e), id(f)) print(e is f) # True ``` ### 引用计数、del语句和垃圾回收 ```python # python中垃圾回收的算法是采用引用计数 # 对象有多少个变量指针就有多少个引用计数 a = 1 # 引用计数+1 b = a # 引用计数+1 del a # 引用计数-1 print(b) # 1 print(a) # 报错 # 引用计数为0 时python的垃圾回收机制会回收该内存空间 class A: def __del__(self): # python 解释器回收对象时调用该方法 # 可以添加自己的代码逻辑,在对象回收时调用该逻辑 pass ``` ### 关于可变序列传参的问题 ```python def add(a, b): a += b # 传递了可变序列就将a修改了(+=) return a class Company: def __init__(self, name, staffs=[]): self.name = name self.staffs = staffs def add(self, staff_name): self.staffs.append(staff_name) def remove(self, staff_name): self.staffs.remove(staff_name) if __name__ == '__main__': com1 = Company("com1", ["b1", "b2", "b3"]) # 传递列表进去,列表可以被修改 print(Company.__init__.__defaults__) com1.add("b4") com1.remove("b1") print(com1.staffs) com2 = Company("com2") com2.add("b5") print(com2.staffs) # ['b5'] print(Company.__init__.__defaults__) com3 = Company("com3") com3.add("b6") print(Company.__init__.__defaults__) print(com2.staffs) # ['b5', 'b6'] print(com3.staffs) # ['b5', 'b6'] print(com2.staffs is com3.staffs) # True # 结果打印错误原因: com2和com3 没有传递staffs列表参数,所以他们使用的是共同的默认list (staffs指向Company.__init__.__defaults__) com1传递了staffs列表所以不是指向默认列表了 a = 1 b = 2 c = add(a, b) print(c) print(a, b) a = [1, 2] b = [3, 4] c = add(a, b) print(c) # [1, 2, 3, 4] print(a, b) # [1, 2, 3, 4] a += b a被修改了 a = (1, 2) b = (3, 4) c = add(a, b) print(c) print(a, b) ``` ## 元类编程 ### property动态属性 ```python from datetime import date, datetime class User: def __init__(self, name, birthday): self.name = name self.birthday = birthday self._age = 0 # def get_age(self): # return datetime.now().year - self.birthday.year @property # 计算属性 ,将函数变为属性描述符 def age(self): # 用处:可以放入时区转换逻辑 return datetime.now().year - self.birthday.year @age.setter # 将函数以属性赋值的方式进行赋值 def age(self, value): self._age = value if __name__ == '__main__': user = User("b1", date(year=1987, month=1, day=1)) user.age = 30 print(user._age) # 30 print(user.age) # 32 ``` ### __getattr__ 和 __getattribute__ 魔法函数 ```python # __getattr__, __getaattribute__ # __getattr__ 就是在查找不到属性的时候调用 # __getaattribute__ 所有属性查找的入口 from datetime import date class User: def __init__(self, name, birthday, info): self.Name = name self.birthday = birthday self.info = info def __getattr__(self, item): # 实例未查找到该属性时调用的魔法函数 return self.info[item] # 能够在传入的字典查询到属性 def __getattribute__(self, item): # 实例查找属性优先进入该方法;所有属性的访问入口 return "c1" # 一般不要重写该方法,导致类实例化过程逻辑出错 if __name__ == '__main__': user = User("b1", date(year=1987, month=1, day=1), info={"company_name": "znufe"}) print(user.company_name) ``` ### 属性描述符和属性查找过程 ```python import numbers class IntField: """ 数据属性描述符,要实现__set__方法 属性描述符 实现下列三种方法之一即为属性描述符对象 控制属性赋值 """ def __get__(self, instance, owner): return self.value def __set__(self, instance, value): if not isinstance(value, numbers.Integral): raise ValueError("int value need") if value < 0: raise ValueError("positive value need") self.value = value def __delete__(self, instance): pass class NonDataIntField: """非数据属性描述符,只需要实现__get__方法""" def __get__(self, instance, owner): pass class User: age = IntField() if __name__ == '__main__': user = User() user.age = 18 # 进入Intfield的value中没有进入到user的实例中 ``` ```txt # 属性查找过程 如果user是某个类的实例,那么user.age(以及等价的getattr(user,'age')) 首先调用__getattribute__, 如果类定义了__getattr__方法, 那么在__getattribute__抛出 AttributeError的时候就会调用到__getattr__, 而对于描述符(__get__)的调用,则是发生在__getattribute__内部的。 user = User(), 那么user.age 顺序如下 1) 如果"age"是出现在User或者基类的__dict__中,且age是data descriptor, 那么调用其__get__方法,否则 2) 如果"age"出现在user的__dict__中,那么直接返回obj.__dict__['age'], 否则 3) 如果"age"出现在User或基类的__dict__中 3.1) 如果age是non-data descriptor,那么调用其__get__方法, 否则 3.2) 返回__dict__['age'] 4) 如果User有__getattr__方法,调用__getattr__方法,否则 5) 抛出AttributeError ``` ### __new__ 和 __init__的区别 ```python class User: def __new__(cls, *args, **kwargs): # 传递的类,可以自定义类的生成过程 print("in new") return super().__new__(cls) # 调用父类方法能够进入init函数 def __init__(self, name): # 传递的是对象 print("in init") self.name = name # new 是用来控制对象的生成过程,在对象生成之前 # init是用来完善对象的 # 如果new方法不返回对象, 则不会调用init函数 if __name__ == '__main__': user = User("a1") ``` ### 自定义元类 ```python # 类也是对象,type创建类的类 def create_class(name): if name == "user": class User: def __str__(self): return "user" return User elif name == "company": class Company: def __str__(self): return "company" return Company # type动态创建类 def say(self): return "i am user" # return self.name class Baseclass: def answer(self): return "i am baseclass" User = type("User", (Baseclass,), {"name": "user", "say": say}) # 类名称, 基类, 属性 # 什么是元类,元类是创建类的类,对象 <- class(对象) <- type class Metaclass(type): # 控制类对象实例化的过程 def __new__(cls, *args, **kwargs): return super().__new__(cls, *args, **kwargs) class User1(metaclass=Metaclass): # 控制实例化的过程 def __init__(self, name): self.name = name def __str__(self): return "user" # python中类的实例化过程,会首先寻找metaclass,通过metaclass去创建user类 # type去创建类对象 if __name__ == '__main__': # MyClass = create_class("user") # my_obj = MyClass() # print(type(my_obj)) my_obj = User1("b1") print(my_obj) ``` ### orm实践 1. 定义属性描述符用来表示字段类型 2. 设计元类控制类实例化过程,将类的变量和元信息整合为字典对象,用于后期sql语句拼接 3. 设计基类继承元类用于控制对象实例化时传入关键字参数进行实例化的情况 4. 获取表名,属性名,值进行sql拼接 ```python import numbers class Field: """字段类,用于判断属性是否为该类型还是属性为类自带的魔法方法""" pass class IntField(Field): # 属性描述符 def __init__(self, db_column=None, min_value=None, max_value=None): self._value = None self.min_value = min_value self.max_value = max_value self.db_column = db_column if min_value is not None: """参数检查""" if not isinstance(min_value, numbers.Integral): raise ValueError("min_value must be int") elif min_value < 0: raise ValueError("min_value must be positive int") if max_value is not None: if not isinstance(max_value, numbers.Integral): raise ValueError("min_value must be int") elif max_value < 0: raise ValueError("min_value must be positive int") if max_value is not None and min_value is not None: if min_value > max_value: raise ValueError("min_value must be smaller than max_value") def __get__(self, instance, owner): return self._value def __set__(self, instance, value): if not isinstance(value, numbers.Integral): raise ValueError("int value need") if value < self.min_value or value > self.max_value: raise ValueError("value must between min_value and max_value") self._value = value class CharField(Field): def __init__(self, db_column=None, max_length=None): self._value = None self.db_column = db_column if max_length is None: raise ValueError("you must specify max_length for charfield") self.max_length = max_length def __get__(self, instance, owner): return self._value def __set__(self, instance, value): if not isinstance(value, str): raise ValueError("str value need") if len(value) > self.max_length: raise ValueError("value len excess len of max_length") self._value = value class ModelMetaClass(type): """元类,控制类实例化过程,对实例添加额外信息,如提取类的变量""" def __new__(cls, name, bases, attrs, **kwargs): if name == "BaseModel": return super(ModelMetaClass, cls).__new__(cls, name, bases, attrs, **kwargs) fields = {} for key, value in attrs.items(): if isinstance(value, Field): fields[key] = value # 记录了所有数据表相关的列 attrs_meta = attrs.get("Meta", None) # 获取元信息 _meta = {} db_table = name.lower() # 默认表名为类名小写 if attrs_meta is not None: table = getattr(attrs_meta, "db_table", None) # 元信息中定义了db_table,覆盖原表名 if table is not None: db_table = table _meta["db_table"] = db_table attrs["_meta"] = _meta # 拼装字段 attrs["fields"] = fields del attrs["Meta"] return super().__new__(cls, name, bases, attrs, **kwargs) class BaseModel(metaclass=ModelMetaClass): def __init__(self, *args, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) # 使用属性描述符对属性赋值 return super().__init__() def save(self): fields = [] values = [] for key, value in self.fields.items(): # 属性名 属性描述符对象 db_column = value.db_column # 获取属性描述符对象中的属性 if db_column is None: db_column = key.lower() fields.append(db_column) value = getattr(self, key) # 获取属性描述符属性的值 __get__方法 values.append(str(value)) sql = "insert {db_table}({fields}) value({values})".format(db_table=self._meta["db_table"], fields=",".join(fields), values=",".join(values)) pass class User(BaseModel): name = CharField(db_column="name", max_length=10) age = IntField(min_value=0, max_value=100) class Meta: db_table = "user" if __name__ == '__main__': user = User(name="b1", age=28) # user = User() # user.name = "b1" # 属性描述符set方法赋值 # user.age = 28 user.save() ``` ## 迭代器和生成器 ### python中的迭代协议 ```txt 1. 什么是迭代协议 2. 迭代器是什么? 迭代器是访问集合内元素的一种方式,一般用来遍历数据 3. 迭代器和以下标的访问方式不一样, 迭代器是不能返回的,迭代器只能一条一条访问数据,迭代器提供了一种惰性访问数据的方式 4. 可迭代的对象(Iterable)只需实现__iter__方法 5. 迭代器(Iterator)必须要实现__next__方法 ``` ### 迭代器和可迭代对象 ```python from collections.abc import Iterator, Iterable # 可迭代对象 # 迭代器的设计模式 ,使用迭代器去维护内部变量,可迭代对象__iter__返回一个迭代器 class Company(object): """可迭代对象""" def __init__(self, employee_list): self.employee = employee_list # def __getitem__(self, item): # # print(item) # return self.employee[item] def __iter__(self): """返回一个迭代器""" return MyIterator(self.employee) def __len__(self): return len(self.employee) class MyIterator(Iterator): """迭代器不支持切片""" def __init__(self, employee_list): self.iter_list = employee_list self.index = 0 # 维护下标 def __next__(self): # 真正返回迭代值的逻辑 try: word = self.iter_list[self.index] except IndexError: # 错误类型转换 raise StopIteration # for语句处理的错误类型 self.index += 1 return word if __name__ == '__main__': company = Company(["b1", "b2", "b3"]) # my_itor = iter(company) # while True: # try: # print(next(my_itor)) # except StopIteration: # pass # next(my_itor) print(isinstance(company, Iterable)) for item in company: print(item) ``` ### 生成器函数的使用 1. 生成器函数返回值以yield关键字进行返回,生成器函数属于生成器对象 2. 生成器对象,python编译字节码的时候就产生了 3. 生成器对象为惰性求值, 延迟求值提供可能 ```python def gen_fib2(index): n, a, b = 0, 0, 1 while n < index: yield b a, b = b, a + b n += 1 for data in gen_fib2(10): print(data) # 1 1 2 3 5 8 13 21 34 55 ``` ### 生成器原理 ```python # 1. python 中函数的工作原理 import inspect frame = None def foo(): bar() def bar(): global frame frame = inspect.currentframe() # python.exe会用一个叫做PyEval_EvalFramEx(c函数)去执行foo函数, # 首先会创建一个栈帧(stack frame) # python 一切皆对象, 栈帧对象,栈帧对象的上下文会运行字节码对象, # 字节码对象全局唯一 # 当foo调用子函数,又会创建一个栈帧,新创建的栈帧上下文中去运行bar的字节码 # 所有的栈帧都是分配在堆内存上 # 堆内存的特性:不去释放堆内存则一直会存放在内存中 # 栈帧可以独立于调用者存在 import dis # 反编译查看字节码 # print(dis.dis(foo)) foo() print(frame.f_code.co_name) caller_frame = frame.f_back # f_back 指向调用者 print(caller_frame.f_code.co_name) def gen_func(): yield 1 name = "b1" yield 2 age = 30 return "i1" # 生成器对象分配在堆内存中,所以可以像函数的栈帧一样,独立于调用者存在。 # 拿到生成器对象,每次调用,都会重新生成栈帧,只要拿到生成器对象都会控制它继续往前走 # 在任何地方任何函数任何模块只要拿到生成器对象都可以控制,(暂停,继续) gen = gen_func() print(dis.dis(gen)) print(gen.gi_frame.f_lasti) # 记录最近执行代码的字节码的位置 print(gen.gi_frame.f_locals) # locals变量 next(gen) # 控制生成器的暂停 print(gen.gi_frame.f_lasti) print(gen.gi_frame.f_locals) next(gen) print(gen.gi_frame.f_lasti) print(gen.gi_frame.f_locals) ``` ### 利用生成器读取大文件数据 ```python # 500G(一行) 文件 读取写入数据库 def myreadlines(f, newline): # 句柄, 分割符 buf = "" # 类似于缓存 while True: while newline in buf: # 查询分割符是否在缓存数据中 pos = buf.index(newline) # 定位缓存中分隔符位置 yield buf[:pos] # 输出分割符之前的语句 buf = buf[pos + len(newline):] # 缓存数据进行截断操作 chunk = f.read(4096) # 读取文件 if not chunk: # 已经读到文件结尾 ,文件边缘判断,边界条件 结尾时没有分割符 yield buf # 输出最后一句 break # 边界终止 buf += chunk # 文件加入到缓存 with open("input.txt") as f: for line in myreadlines(f, "{|}"): print(line) ``` ## Python Socket编程 1. 实现服务端与客户端的长连接通讯 ```python # server端 import socket import threading server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 指定类型, 类型对应的协议 server.bind(('0.0.0.0', 8000)) # 协议,地址,端口 tuple 绑定到0.0.0.0 是自动搜网卡地址 server.listen() # server用来监听 def handle_sock(sock, addr): while True: data = sock.recv(1024) # data返回的是bytes类型 res = data.decode("utf8") print(res) if res == "bye" or res == "exit": break re_data = input() sock.send(re_data.encode("utf8")) sock.close() # 获取从客户端发送的数据 # 一次获取1k的数据 while True: sock, addr = server.accept() # 接收到连接请求得到sock和address sock 套接字对象 用来发送 # 用线程去处理新接收到的连接(用户) client_thread = threading.Thread(target=handle_sock, args=(sock, addr)) client_thread.start() # data = sock.recv(1024) # data返回的是bytes类型 # print(data.decode("utf8")) # re_data = input() # sock.send(re_data.encode("utf8")) # sock.send("hello {}".format(data.decode("utf8")).encode("utf8")) # socket 只能发送bytes类型 # server.close() # sock.close() ``` ```python # client端 import socket client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(("127.0.0.1", 8000)) # 连接 while True: re_data = input() client.send(re_data.encode("utf8")) data = client.recv(1024) # 客户端先发送数据所以打印数据置后 print(data.decode("utf8")) if re_data == "bye" or re_data == "exit": break client.close() # client.send("b1".encode("utf8")) # data = client.recv(1024) # print(data.decode("utf8")) # client.close() ``` 2. socket实现http请求 ```python # requests -> urlib -> socket import socket from urllib.parse import urlparse def get_url(url): # 通过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 建立socket连接 进行http请求 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80)) # 连接 print("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host)) # http协议发送的格式 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) data = b"" while True: d = client.recv(1024) if d: data += d else: break data = data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(data) print(html_data) client.close() ``` ## 多线程、多进程和线程池编程 #### 线程与进程 ```txt 操作系统能够切换和调度的最小单元是线程 线程依赖于进程 对于io操作而言,多线程和多进程性能差别不大 线程调度相较于进程调度更加轻量化 ``` ### python中的GIL ```txt GIL global interpreter lock python 中一个线程对应于c语言中的一个线程 gil使得同一时刻只有一个线程运行在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行 下述案例,可以看出打印的全局变量并不固定,说明GIL存在内部自动释放的机制。 gil会根据字节码执行的行数或者时间片释放gil,gil在遇到io操作时主动释放 ``` ```python import threading total = 0 def add(): # dosomething1 # io操作 # dosomething2 global total for i in range(1000000): total += 1 def desc(): global total for i in range(1000000): total -= 1 thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() # 数值不稳定 print(total) ``` ### 多线程编程实现的两种方式 ```python # 1. 通过Thread类实例化 import time import threading def get_detail_html(url): print("get detail html started") time.sleep(2) print("get detail html end") def get_detail_url(url): print("get detail url started") time.sleep(4) print("get detail url end") # 2. 通过继承Thread来实现多线程 class GetDetailHtml(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print("get detail html started") time.sleep(2) print("get detail html end") class GetDetailUrl(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print("get detail url started") time.sleep(4) print("get detail url end") if __name__ == '__main__': # thread1 = threading.Thread(target=get_detail_html, args=("",)) # threading类实例化只能传递函数名,参数使用tuple传递 # thread2 = threading.Thread(target=get_detail_url, args=("",)) # threading类实例化只能传递函数名,参数使用tuple传递 # thread1.setDaemon(True) # 设置为守护线程,主线程关闭掉,守护线程关闭掉 # thread2.setDaemon(True) thread1 = GetDetailHtml("get_detail_html") thread2 = GetDetailUrl("get_detail_url") start_time = time.time() thread1.start() thread2.start() thread1.join() # 阻塞,等两个线程执行后再进行打印 thread2.join() # 整个时间为单一线程的最大时间,体现了并发 # print("last time {}".format(time.time() - start_time)) print(f"last time {time.time() - start_time}") # 时间为0 # 存在三个线程 两个创建的线程以及一个记录时间的主线程 # 引申的需求,当主线程退出时,子线程kill掉 使用setDaemon 设置子线程为守护线程 ``` ### 线程间的通讯 1. 全局变量,将线程间用于通讯可以设置全局变量,全局变量放置另外的py文件中易于维护 ```python import time import threading from advanced.chapter10 import variables # 全局变量独立存储 不能直接导入变量必须以.的形式引入变量 # detail_url_list = [] total = 0 def get_detail_html(): """ 获取文章详情页 """ detail_url_list = variables.detail_url_list while True: if len(detail_url_list): t_time = time.time() url = detail_url_list.pop() # 线程不安全 # for url in detail_url_list: print("get detail html started\r\n") time.sleep(1) print(url) time.sleep(1) print("get detail html end\r\n") time.sleep(2) print(f" threading last time {time.time() - t_time}\r\n") global total total += 1 if total == 20: break def get_detail_url(): """ 获取文章列表页 """ detail_url_list = variables.detail_url_list while True: s_time = time.time() print("get detail url started\r\n") time.sleep(1) for i in range(20): detail_url_list.append(f"http://projects.edu.com/{i}\r\n") print(f"threading last time {time.time() - s_time}\r\n") print("get detail url end\r\n") break # 实现线程间通讯的方法 # 1. 共享变量: 声明全局变量,在各线程中使用 存在问题:线程操作不安全 if __name__ == '__main__': thread_detail_url = threading.Thread(target=get_detail_url) # threading类实例化只能传递函数名,参数使用tuple传递 start_time = time.time() threads = [] for i in range(10): html_thread = threading.Thread(target=get_detail_html) html_thread.start() threads.append(html_thread) thread_detail_url.start() thread_detail_url.join() for j in threads: j.join() print(f"total last time {time.time() - start_time}\r\n") ``` 2. 使用消息队列的方式进行通讯 ```python # 通过queue的方法进行线程间同步 from queue import Queue import time import threading total = 0 def get_detail_html(queue): """ 获取文章详情页 """ while True: if not queue.empty(): url = queue.get() # 阻塞方法 # for url in detail_url_list: print("get detail html started") time.sleep(1) print(url) time.sleep(1) print("get detail html end") global total total += 1 if total == 20: # 数据总量,跳出循环条件 break def get_detail_url(queue): """ 获取文章列表页 """ while True: print("get detail url started") time.sleep(1) for i in range(20): queue.put(f"http://projects.edu.com/{i}") print("get detail url end") break if __name__ == '__main__': detail_url_queue = Queue(maxsize=1000) thread1 = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) # threading类实例化只能传递函数名,参数使用tuple传递 threads = [] for i in range(10): html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) html_thread.start() threads.append(html_thread) start_time = time.time() thread1.start() thread1.join() # detail_url_queue.task_done() # detail_url_queue.join() # 阻塞主线程 for j in threads: j.join() print(f"last time: {time.time() - start_time}") ``` 动态线程创建及动态线程阻塞,使用list存储线程,遍历阻塞。 ### 线程同步(Lock、RLock、Semaphores、Condition) #### 线程同步机制及原因 ```python # 线程同步机制 # 同一资源多线程同步修改导致数值不稳定 def add1(a): a += 1 def desc(a): a -= 1 """字节码解析 0 LOAD_FAST 0 (a) 2 LOAD_CONST 1 (1) 4 INPLACE_ADD 6 STORE_FAST 0 (a) 1. load a 2. load 1 3. + 4. 赋值给a """ # 一行字节码执行后GIL释放切换到另一线程, 第四步赋值语句时出错a= 1/-1 而不为0 # web,电商网站中库存问题即为该类型问题 import dis # print(dis.dis(add1)) # print("=========") # print(dis.dis(desc)) ``` #### Lock及RLock ```python import threading from threading import Lock, RLock # RLock 可重入的锁 解决线程内部第一类死锁问题 # 在同一个线程中,可以连续调用多次acquire,一定要acquire的次数和release相等 # 线程间还是竞争的,rlock可以在线程内部嵌套函数使用 total = 0 lock = threading.Lock() rlock = threading.RLock() def add(): # dosomething1 # io操作 # dosomething2 global total global lock global rlock for i in range(1000000): # lock.acquire() # 添加锁 # lock.acquire() # 死锁原因之一 # total += 1 # lock.release() # 释放锁 rlock.acquire() # 添加锁 rlock.acquire() total += 1 rlock.release() # 释放锁 rlock.release() # 释放锁 def desc(): global total global lock global rlock for i in range(1000000): # lock.acquire() # 添加锁 # total -= 1 # lock.release() # 释放锁 rlock.acquire() # 添加锁 total -= 1 rlock.release() # 释放锁 thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print(total) ``` #### 死锁出现原因 ```python # 1. 用锁会影响性能 # 2. 锁会引起死锁 # 死锁:1) 未release就继续acquire,函数及子函数连续调用acquire # 死锁:2) 互相等待 A(a,b) """ A (a, b) acquire(a) acquire(b) B(b, a) acquire(b) acquire(a) 假定B, A 线程请求资源a,b A先请求a, B先请求b,则后续会导致循环等待 资源竞争的死锁 """ ```` ### 线程同步condition使用及源码解析 #### condition使用的原因及lock使用的缺陷 ```python from threading import Condition import threading import time # Condition : 条件变量,用于复杂的线程间同步 lock = threading.Lock() class XiaoAi(threading.Thread): def __init__(self, lock): self.lock = lock super(XiaoAi, self).__init__(name="小爱") def run(self) -> None: self.lock.acquire() print(f"{self.name}: 在") self.lock.release() # time.sleep(1) self.lock.acquire() print(f"{self.name}: 没吃") self.lock.release() class TianMao(threading.Thread): def __init__(self, lock): self.lock = lock super(TianMao, self).__init__(name="天猫精灵") def run(self) -> None: self.lock.acquire() print(f"{self.name}: 小爱同学") self.lock.release() # time.sleep(1) self.lock.acquire() print(f"{self.name}: 吃了吗") self.lock.release() if __name__ == '__main__': xiaoai = XiaoAi(lock) tianmao = TianMao(lock) tianmao.start() xiaoai.start() """ 天猫精灵: 小爱同学 天猫精灵: 吃了吗 小爱: 在 小爱: 没吃 """ # 失败原因:gil根据时间片切换由于线程中锁对象内部的执行逻辑时间较少, # gil未进行线程切换时,锁已经释放并执行下一次线程内部获得锁操作 # 可发现加入time.sleep该问题得已解决,应征该问题出现原因。 ``` #### condition使用的方式和原理 ```python import threading # 通过condition完成按序执行 from threading import Condition lock = threading.Lock() cond = threading.Condition() class XiaoAi(threading.Thread): def __init__(self, cond): self.cond = cond super(XiaoAi, self).__init__(name="小爱") def run(self) -> None: with self.cond: self.cond.wait() print(f"{self.name}: 在") self.cond.notify() self.cond.wait() print(f"{self.name}: 没吃") self.cond.notify() self.cond.wait() print(f"{self.name}: 好") self.cond.notify() class TianMao(threading.Thread): def __init__(self, cond): self.cond = cond super(TianMao, self).__init__(name="天猫精灵") def run(self) -> None: with self.cond.acquire(): print(f"{self.name}: 小爱同学") self.cond.notify() self.cond.wait() print(f"{self.name}: 吃了吗") self.cond.notify() self.cond.wait() print(f"{self.name}: 吃火锅好吗") self.cond.notify() self.cond.release() if __name__ == '__main__': xiaoai = XiaoAi(cond) tianmao = TianMao(cond) # condition的启动顺序很重要 # 先起wait的 # tianmao先start时 """ print(f"{self.name}: 小爱同学") self.cond.notify() 小爱还没有start起来 wait只有notify才能唤醒 而此时天猫已经发过notify了 所以调用小爱线程后会一直等待无法被唤醒 """ # 在调用with self.cond: 后才能调用wait或notify方法 # condition有两层锁,一把底层锁,会在线程调用了wait方法的时候释放, # 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒 xiaoai.start() tianmao.start() ``` ### Semaphore 信号量 使用场景:用于控制进入数量的锁,相当于流量控制,控制一次允许进入的数量 文件、读,写,写一般是只允许一个线程写,读可以允许有多个 ```python # 模拟爬虫 import threading import time class HtmlSpider(threading.Thread): def __init__(self, url, sem): super(HtmlSpider, self).__init__() self.url = url self.sem = sem def run(self): time.sleep(2) print("got html text success") self.sem.release() class UrlProducer(threading.Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): self.sem.acquire() # sem对象的传入大于0的值时,执行一次数值减1,减至0变为阻塞 # 必须要在调用的线程中release html_thread = HtmlSpider(f"www.baidu.com/{i}", self.sem) html_thread.start() if __name__ == '__main__': sem = threading.Semaphore(3) # 每次放三个 url_producer = UrlProducer(sem) url_producer.start() ``` ### concurrent 线程池编码 ```python # 线程池 使用的原因 # 需求; 主线程中获取某一个线程的状态或者某一个任务的状态,以及返回值 # 当一个线程完成的时候我们主线程能立即知道 # futures可以让多线程和多进程编码接口一致 ``` ```python from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED import time def get_html(times): time.sleep(times) print(f"get page {times} success") return times executor = ThreadPoolExecutor(max_workers=2) # 获取已经成功的task的返回 urls = [3, 2, 4] # 动态获取所有任务,进行执行 all_task = [executor.submit(get_html, (url)) for url in urls] print(type(all_task[0])) # submit提交线程池后返回一个future对象 wait(all_task, return_when=FIRST_COMPLETED) # wait()阻塞主线程,return_when参数是阻塞规则 第一个子线程执行完就执行主线程 print("main") # for future in as_completed(all_task): # 遍历已经成功的task # data = future.result() # print(f"get {data} page success") # 通过executor的map获取已经完成的task的值 # 返回结果的顺序与传入的顺序一致 # for data in executor.map(get_html, urls): # print(f"get {data} page success") # 单步穷举范例 # 通过submit函数提交执行的函数到线程池中, submit 是非阻塞的立即返回 # task1 = executor.submit(get_html, (3)) # task2 = executor.submit(get_html, (2)) # done方法用于判定某个任务是否完成 # print(task1.done()) # # print(task2.cancel()) # 任务在执行中或执行完成是无法取消的 # time.sleep(3) # print(task1.done()) # # # result方法可以获取task的执行结果 # print(task1.result()) # 阻塞 ``` ### ThreadPoolExecutor源码分析 #### submit源码分析 ```python executor = ThreadPoolExecutor(max_workers=2) task1 = executor.submit(get_html, (3)) ``` ```python # submit源码 def submit(self, fn, *args, **kwargs): with self._shutdown_lock: # 锁,为了保证后续代码安全 if self._broken: raise BrokenThreadPool(self._broken) if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') if _shutdown: raise RuntimeError('cannot schedule new futures after' 'interpreter shutdown') f = _base.Future() # 当提交任务进入线程池后,会生成一个future对象 w = _WorkItem(f, fn, args, kwargs) # 工作单元 将future,执行任务,参数,放入了WorkItem, self._work_queue.put(w) # 将工作单元放入工作队列 self._adjust_thread_count() # 调整线程数量 return f submit.__doc__ = _base.Executor.submit.__doc__ ``` ### 多进程与多线程应用场景与性能对比 ```python # 多进程编程 # 多线程 # 耗cpu的操作,无法利用多核特性,无法充分利用cpu的性能 # 所以耗cpu的操作,用多进程编程,对于io操作来说,使用多线程编程 # 对于操作系统而言进程切换代价要高于线程调度 # 1. 对于耗费cpu的操作,(计算类),多进程优于多线程 from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor import time def fib(n): if n <= 2: return 1 return fib(n - 1) + fib(n - 2) # if __name__ == '__main__': # """ # windows 下如果不放在if __name__ == '__main__':执行进程池代码,会抛出以下异常,而linux不会 # concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. # # """ # with ProcessPoolExecutor(4) as executor: # all_task = [executor.submit(fib, (num)) for num in range(25, 40)] # start_time = time.time() # for future in as_completed(all_task): # data = future.result() # print(f"result: {data} ") # # print(f"last time is :{time.time() - start_time}") # # ''' # all_task = [executor.submit(fib, (num)) for num in range(25, 40)] # ProcessPoolExecutor(3): 38.00974655151367 # ThreadPoolExecutor(20): 71.29457139968872 # ''' # 对于io操作来说,多线程优于多进程 def random_sleep(n): time.sleep(n) return n if __name__ == '__main__': ''' 模拟io操作 ''' with ThreadPoolExecutor(3) as executor: all_task = [executor.submit(random_sleep, (num)) for num in [2]*30] start_time = time.time() for future in as_completed(all_task): data = future.result() print(f"result: {data} ") print(f"last time is :{time.time() - start_time}") ``` ### 多线程编程 #### 子进程与父进程 ```python import os import time # fork只能用于linux/unix中 pid = os.fork() # 新建子进程 # 子进程将父进程的数据拷贝 # 子进程中将fork后的代码重新运行 print("b1") # if pid == 0: # pid为0是子进程,pid为数值是主进程 print(f"子进程是:{os.getpid()},父进程是:{os.getppid()}.") else: print(f'我是父进程:{pid}') # 如果没有sleep则父进程运行完直接退出,此时子进程无法退出 # 增加sleep,当子进程结束后父进程未退出,父进程结束后,子进程会被kill掉 time.sleep(2) ''' b1 我是父进程: 24474 b1 子进程:24474, 父进程是24473 ''' ``` #### 进程池ProcessPoolExecutor ```python from concurrent.futures import ProcessPoolExecutor import time def get_html(n): time.sleep(n) print("sub_progress success") return n if __name__ == '__main__': progress = multiprocessing.Process(target=get_html, args=(2,)) print(progress.pid) progress.start() print(progress.pid) # 进程id progress.join() print("main progress end") ``` #### multiprocessing多进程编程 ```python import multiprocessing import time def get_html(n): time.sleep(n) print("sub_progress success") return n if __name__ == '__main__': pool = multiprocessing.Pool(multiprocessing.cpu_count()) # 进程数等于cpu数 # result = pool.apply_async(get_html, args=(3,)) # # 等待所有任务完成 # pool.close() # pool.join() # 使用join之前必须要close让pool不再接收新任务 # print(result.get()) # 获得任务的返回值 # imap 同线程池中的map方法一致,返回值顺序与传入顺序一致 # for result in pool.imap(get_html, [1, 5, 3]): # print(f"{result} sleep success") # imap_unordered 谁先完成打印谁的结果 for result in pool.imap_unordered(get_html, [1, 5, 3]): print(f"{result} sleep success") ``` ### 进程间通讯Manager,Pipe,Queue 1. python中的三个Queue的使用场景 ```python from queue import Queue from multiprocessing import Queue from multiprocessing import Manager queue = Manager().Queue(10) ``` 第一种Queue可以用于线程间通讯 第二种Queue可以用于进程间通讯而不能用于pool进程池间通讯 第三种Manager().Queue(10)可以用于进程池通讯 ```python from multiprocessing import Queue queue = Queue(10) def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == '__main__': queue = Queue(10) my_producer = Process(target=producer, args=(queue, )) my_consumer = Process(target=consumer, args=(queue, )) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() ``` 2. 传统的共享全局变量不能适用于多进程编程,可以适用于多线程 多进程中数据完全隔离,当fork后,进程将数据完全复制一份到子进程中,两边数据做修改的时候不会影响 3. 通过pipe实现两个进程间通讯 pipe 只能适用于两个指定的进程!!! pipe 性能高于queue ```python from multiprocessing import Process, Queue, Pool, Manager, Pipe def producer(pipe): pipe.send("a") time.sleep(2) def consumer(pipe): time.sleep(2) data = pipe.recv() print(data) if __name__ == '__main__': reversed_pipe, send_pipe = Pipe() # pipe 只能适用于两个指定的进程!!! # pipe 性能高于queue my_producer = Process(target=producer, args=(send_pipe,)) my_consumer = Process(target=consumer, args=(reversed_pipe,)) my_producer.start() my_producer.join() my_consumer.start() my_consumer.join() ``` 4. 进程间共享内存操作 Manager 中有常见的数据类型 使用时注意数据同步,Manager中LockRLock的使用 ```python from multiprocessing import Process, Queue, Pool, Manager, Pipe def add_data(p_dict, k, v): p_dict[k] = v if __name__ == '__main__': progress_dict = Manager().dict() first_progress = Process(target=add_data, args=(progress_dict, "b1", 23)) second_progress = Process(target=add_data, args=(progress_dict, "b2", 24)) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print(progress_dict) ``` ## 协程和异步io ### 并发、并行 并发:一个时间段内,有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行。(cpu核心一个时间点只能运行一个程序,时间段内cpu可以切换程序) 并行:任意时刻点上,有多个程序同时运行在多个cpu上 同步:代码调用IO操作时,必须等待IO操作完成才返回的调用方式 异步:代码调用IO操作时,不必等IO操作完成就返回的调用方式 阻塞:调用函数时候当前线程被挂起 非阻塞:调用函数的时候当前线程不会被挂起,而是立即返回 ### C10K问题和io多路复用 C10K 问题 Unix下五种I/O模型 阻塞式I/O :例如socket编程中的connect()方法就是阻塞式I/O,不返回的话线程会一直挂在那里 非阻塞式I/O:调用后立即返回无等待期,例如socket编程中setblocking(False),如果connect三次握手没有完成则send会报错,所以需要一直轮询连接状态,轮询需要耗cpu。 使用场景:做与需要返回结果的代码无关的任务。 反复请求数据准备状态也需要消耗cpu I/O多路复用:select,poll, epoll。select调用后返回哪些socket或文件句柄已经准备好了 select也是一个阻塞式方法,select与while和for区别是可以监听一个socket,句柄列表,哪个好了复制(数据从操作系统内存复制到用户应用内存)哪个数据。select没有进行轮询。数据内核从复制到应用空间的时间耗费无法节省 信号驱动式I/O: 异步I/O(POSIX的aio_系列函数):aio_read。将数据从内核复制到用户缓存中时才去通知处理 select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般数读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。 ##### select: select函数监视文件描述符分三类,分别是writefds,readfds和exceptfds。调用后select函数会阻塞,知道有描述符就绪(有数据,可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。 select目前几乎在所有的平台上支持也是它的一个优点。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。 ##### poll 不同于select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。 pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。 从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述数量的增长,其效率也会线性下降 ##### epoll 首先epoll在Linux下支持,在windows下不支持 epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的时间存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次 #### epoll和select的使用场景 例如网站 在并发高的情况下,连接活跃度不是很高,epoll比select好 例如游戏 并发性不高,同时连接活跃度高的情况下,select不如epoll好 #### 通过非阻塞式I/O完成http请求 非阻塞式I/O需要不停的查询连接状态 ```python import socket from urllib.parse import urlparse def get_url(url): # 通过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 建立socket连接 进行http请求 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False) # 立即返回 非阻塞式I/O, 缺陷 try: client.connect((host, 80)) # 连接 阻塞不会消耗cpu except BlockingIOError as e: pass # 如果采用非阻塞不停的询问连接是否建立好,需要while循环不同的去检查状态,只能去监听一个socket状态 # 非阻塞式I/O使用情景不依赖之前的链接做计算任务或再次发起其它的连接接请求 # print("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host)) # http协议发送的格式 while True: try: client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) break except OSError as e: pass data = b"" while True: try: d = client.recv(1024) except BlockingIOError as e: continue if d: data += d else: break data = data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(data) print(html_data) client.close() if __name__ == '__main__': get_url("https://www.baidu.com") ``` ### 使用selector实现状态监听+回调的方式完成http请求 该方法优势: 可以选择io复用的方法,提供注册机制 select + 回调 + 事件循环 并发性高 使用单线程,不存在线程切换的开销 ```python import socket from urllib.parse import urlparse # DefaultSelector是封装select的提供注册方法 from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ # 可以选择io复用的方法,提供注册机制 # select + 回调 + 事件循环 # 并发性高 # 使用单线程,不存在线程切换的开销 selector = DefaultSelector() # 使用select完成http请求 urls = [] stop = False class Fetcher: def connected(self, key): # 回调函数, 当监听到写事件后需要的操作 # 回调函数中需要注销 selector.unregister(key.fd) # 不再需要try pass,因为启用了事件监听,已经是数据就绪的状态 self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8")) # 注册 监听是否是可写的状态 selector.register(self.client.fileno(), EVENT_READ, self.readable) def readable(self, key): # data = b"" d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) data = self.data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(data) print(html_data) self.client.close() # urls中一个url处理完成就移除url urls.remove(self.spider_url) # 判断urls为空 if not urls: global stop stop = True def get_url(self, url): self.spider_url = url url = urlparse(url) self.host = url.netloc self.data = b"" self.path = url.path if self.path == "": self.path = "/" # 建立socket连接 进行http请求 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False) # 立即返回 非阻塞式I/O try: self.client.connect((self.host, 80)) # 连接 阻塞不会消耗cpu except BlockingIOError as e: pass # 注册 监听是否是可写的状态 # client.fileno() 文件描述符, 监听状态,回调函数名 selector.register(self.client.fileno(), EVENT_WRITE, self.connected) def loop(): # 事件循环,不停的请求socket的状态来调用对应的回调函数 # 1. select本身是不支持register模式 selector对select进行了封装使其支持register # 拿到某一socket句柄时就可以知道该使用哪个回调方法 # 2. socket状态变化后的回调是由程序员完成的 # 所以要写事件循环函数 while not stop: # windows 下默认是select,select接收到空列表时会报错 # linux下并不会报错 ready = selector.select() # 返回值是一个列表,包含了元组key包含一个nametuple,为一个选择键对象,将文件描述符等于key进行关联 for key, mask in ready: # 拆包 call_back = key.data # 找到回调函数 call_back(key) # key的fd是client.fileno()返回值socket句柄 # 回调+事件循环+select/poll/epoll if __name__ == '__main__': fetcher = Fetcher() # fetcher.get_url("https://www.baidu.com") # 并发性能测验 import time start_time = time.time() for url in range(20): url = f"http://jsonplaceholder.typicode.com/posts/{url}" urls.append(url) fetcher = Fetcher() fetcher.get_url(url) loop() print("finnal time", time.time() - start_time) ``` ### 回调方式的问题 1. 如果回调函数执行不正常该如何? 2. 如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办? 3. 如果嵌套了多层,其中某个环节出错了会造成什么结果? 4. 如果有个数据需要被每个回调都处理怎么办? 5. 怎么使用当前函数中的局部变量 痛点: 1. 可读性差 2. 共享状态管理困难 3. 异常处理困难 ### 协程 C10M: 如何利用8核心CPU,64G内存,在10GBPS的网络上保持1000万并发连接 痛点: 1. 回调模式编码复杂度高 2. 同步编程的并发性不高 3. 多线程编程需要线程间同步,lock会降低并发性能 需求: 1. 采用同步的方式去编写异步的代码 2. 使用单线程去切换任务: 2-1. 线程是由操作系统切换的,单线程切换意味着我们需要程序员去调度任务 2-2. 不再需要锁,并发高,如果单线程内切换函数,性能远高于线程切换,并发性更高 传统函数调用过程 A->B->C 我们需要一个可以暂停的函数,并且可以在适当的时候回复该函数的继续执行 协程 -> 有多个入口的函数,可以暂停的函数,可以暂停的函数(可以向暂停的地方传入值) ### 生成器的进阶特性 ```python def gen_func(): # 1. 可以产出值; 2. 可以接收值(调用方传递进来的值) html = yield "http://projectsedu.com" # yield 在表达式右边,有一个特性,将值传递给生成器的内部 print(html) yield 2 yield 3 return "b1" # 1. throw 传入一个异常, close #1. 生成器不止可以产出值,还可以接收值 if __name__ == '__main__': gen = gen_func() # 在调用send发送非none值之前,我们必须启动一次生成器,方式有两种1. gen.send(None) 2. next(gen) # 1. 启动生成器方式有两种, next(), send # url = gen.send("bobby") # TypeError: can't send non-None value to a just-started generator url = gen.send(None) # # download url html = "b2" print(gen.send(html)) # send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置 # print(next(gen)) # print(next(gen)) # print(next(gen)) # for i in gen: # print(i) ``` gen.close() 关闭生成器 ```python def gen_func(): html = yield "http://projectsedu.com" # print(html) yield 2 yield 3 return "b1" if __name__ == '__main__': gen = gen_func() print(next(gen)) gen.close() # 关闭生成器, 再执行next会抛异常 StopIteration next(gen) # GeneratorExit是继承自BaseException而非Exception ``` gen.throw() 向生成器中传递异常 ```python def gen_func(): try: html = yield "http://projectsedu.com" except Exception as e: # 处理throw传递进来的异常 pass # print(html) yield 2 yield 3 return "b1" if __name__ == '__main__': gen = gen_func() print(next(gen)) gen.throw(Exception, "download error") # 往生成器中添加指定异常 print(next(gen)) gen.throw(Exception, "download error") ```