代码拉取完成,页面将自动刷新
'''
Author: Yogurt_cry
Date: 2022-05-04 15:55:46
LastEditors: Yogurt_cry
LastEditTime: 2022-05-05 18:18:38
Description: 从国家统计局官网获取 2021 年的行政区划, 并写入 excel 文件中
FilePath: \address-recognition-server\GetAreaDataFromNBS.py
'''
# -*- coding: utf-8 -*-
import os
import re
from bs4 import BeautifulSoup
from datetime import datetime
from time import sleep
from openpyxl import Workbook
from multiprocessing import Pool
from requests_html import HTMLSession
'''
由于我国幅员辽阔, 各省份下辖的行政区划非常的多,
因此长时间的请求容易造成请求失败的情况, 可能会出现请求超时卡死的情况,
在兼顾数据的完整性的情况下, 通过按省份多进程下载数据, 全部下载完后,
再对所有省份数据进行数据合并, 以此来提高效率
'''
def main():
print('Step 1 从国家统计局中获取行政区划数据')
url = 'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/2021'
provinceList = __getAreaInfoByLoop(url, [], hostUrl = url, type = 'province', isLoop = True)
__provinceList = []
# 一般情况下, 进程稍微开多一点效率会稍微更高, 但不是绝对, 主要还是看设备的承受程度, 进程比较吃 CPU
# 测试电脑的 CPU 是 i7-9700K
processPool = Pool(10)
for item in provinceList:
__url, __hostUrl, __nameList = item
__name = __nameList[0]
if not __name in __provinceList:
if os.path.exists('%s.log' % __name) == False:
processPool.apply_async(__singleProvince, (__url, __hostUrl, __nameList))
# __singleProvince(__url, __hostUrl, __nameList)
__provinceList.append(__name)
processPool.close()
processPool.join()
print('Step 2 整合多省缓存数据')
dataList = []
for item in __provinceList:
with open('%s.log' % item, 'r', encoding = 'utf-8') as f: dataList += f.readlines()
# os.remove('%s.log' % item)
__toExcel(dataList)
print('执行完成')
# 执行省份数据获取
def __singleProvince(url: str, hostUrl: str, nameList: list):
name = nameList[0]
print('{}\t正在从国家统计局获取: [ {} ] 的数据'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), name))
resultList = []
__getAreaInfoByLoop(url, resultList, nameList, hostUrl)
print('{}\t获取完成, 正在写入临时文件'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
__toCache(resultList, name)
print('{}\t[ {} ] 数据获取完成'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), name))
# 写入临时文件
def __toCache(dataList: list, name: str):
with open('%s.log' % name, 'a', encoding = 'utf-8') as f:
for item in dataList:
code = item['code']
nameList = item['name']
rowItem = [ code ]
rowItem += nameList
f.write('%s\n' % '|'.join(rowItem))
# 写入 excel 文件
def __toExcel(dataList: list):
path = '国家统计局2021年全国行政区划_{}.xlsx'.format(datetime.now().strftime('%Y%m%d%H%M%S'))
wb = Workbook(write_only = True)
ws = wb.create_sheet('data', 0)
ws.append([ '编码', '省份', '城市', '区县', '街道', '居委会' ])
for item in dataList:
rowItem = str(item).replace('\n', '').split('|')
ws.append(rowItem)
wb.save(path)
wb.close()
# 递归获取地区信息
def __getAreaInfoByLoop(url: str, resultList: list, lastNameList: list = [], hostUrl: str = None, type: str = None, isLoop: bool = False):
result = []
if type == 'province':
result = __getAreaInfo(url, type = 'province')
else:
result = __getAreaInfo(url, hostUrl)
__resultList = []
for item in result:
__url = item['url']
__code = item['code']
__name = item['name']
cutLength = (lambda x : len(x) if len(x) <= 1 else len(x) - 1)(lastNameList)
__hostUrl = (lambda x, y, z, k : x if y == 'province' else '/'.join(z.split('/')[:-k]))(hostUrl, type, __url, cutLength)
nameList = lastNameList.copy()
nameList.append(__name)
resultItem = { 'code': __code, 'name': nameList }
# 这一段可以注释掉, 只是因为运行时间比较长, 所以需要打印点东西出来看看, 是不是卡死了
print(resultItem)
resultList.append(resultItem)
if isLoop == False:
if __url != '':
sleep(1)
__getAreaInfoByLoop(__url, resultList, nameList, __hostUrl)
else:
__resultList.append((__url, __hostUrl, resultItem['name']))
if isLoop == True: return __resultList
# 获取地区信息
def __getAreaInfo(url: str, hostUrl: str = None, type: str = None) -> list:
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36',
}
# 设置 10 秒超时, 超时后重新启动
tryCount = 0
htmlCode = ''
while tryCount < 100:
try:
sessionObj = HTMLSession()
htmlObj = sessionObj.get(url, headers = headers, timeout = 5)
htmlObj.encoding = 'utf-8'
htmlCode = htmlObj.text
jsList = re.findall(r'<script type="text/javascript">([\w\W]*)</script>', htmlObj.text)
if len(jsList) > 0:
print('重试次数:', tryCount, '; js 报错, 源代码修复', '; Url:', url)
htmlObj.html.render(wait = 1, sleep = 1)
sleep(1)
htmlCode = htmlObj.html.html
else:
if tryCount > 5: print('重试次数:', tryCount, '; Url:', url)
# with open('a.txt', 'w', encoding = 'utf-8') as f: f.write(htmlCode)
sessionObj.close()
break
except Exception as e:
print(e)
sleep(1)
tryCount += 1
soup = BeautifulSoup(htmlCode, 'lxml')
result = []
if type == 'province':
# 处理省级, 省级的页面布局和其他的页面有点不太一样
for item in soup.find_all('tr', class_ = 'provincetr'):
for kidItem in item.find_all('a'):
result.append({
'url': '{}/{}'.format(url, kidItem.attrs['href']),
'code': '',
'name': kidItem.text
})
else:
# 处理除省级以外的部分, 其他页面几乎一致
for item in soup.find_all('tr', class_ = ('citytr', 'countytr', 'towntr', 'villagetr')):
__item = item.find_all('td')
codeObj = __item[0]
nameObj = __item[-1]
resutlItem = {}
hrefCheck = codeObj.find_all('a')
if len(hrefCheck) > 0:
kidItem = hrefCheck[0]
resutlItem['url'] = '{}/{}'.format(hostUrl, kidItem.attrs['href'])
resutlItem['code'] = kidItem.text
else:
resutlItem['url'] = ''
resutlItem['code'] = codeObj.text
nameCheck = nameObj.find_all('a')
if len(nameCheck) > 0:
kidItem = nameCheck[0]
resutlItem['name'] = kidItem.text
else:
resutlItem['name'] = nameObj.text
result.append(resutlItem)
return result
if __name__ == '__main__':
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。