代码拉取完成,页面将自动刷新
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import ActionChains
import requests, base64,time,json
from selenium.webdriver import Remote
from selenium.webdriver.chrome import options
from selenium.common.exceptions import InvalidArgumentException
from config import LOGIN_URL, DATA, URL, driver_path, API_URL, HEADER, GENERAL_API_KEY, GENERAL_SECRET_KEY
import logging, time
import re
logging.basicConfig(level=logging.INFO)
def open_url(url):
driver = webdriver.Chrome()
driver.get(url)
'''判断是否至少有1个元素存在于dom树中,如果定位到就返回列表'''
WebDriverWait(driver, 10).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.van-field__control')))
return driver
def open_web(driver,url):
"""
因为没有调用quit()方法,所以浏览器会话仍会存在。用sessionId来标识会话。
拿到执行器和sessionID就能恢复上次的会话。这两个有api可以直接获取:
@return:
"""
driver.get(url)
# 记录 executor_url 和 session_id 以便复用session
executor_url = driver.command_executor._url
session_id = driver.session_id
url_info = {
'executor_url':executor_url,
'session_id':session_id
}
json_str = json.dumps(url_info)
with open('1.json','w') as fp:
fp.write(json_str)
return driver
def find_element(driver):
inputs = driver.find_elements_by_xpath("//input[@class='van-field__control']")
inputLogin = inputs[0]
inputPassword = inputs[1]
inputVerify = inputs[2]
loginButton = driver.find_element_by_xpath("//div[@class='footBox']/button")
verifyImage = driver.find_elements_by_xpath("//div[@class='userBox']//img[@class='userImg']")[2]
return loginButton, inputLogin, inputPassword, inputVerify, verifyImage
def user_defined(imgbase64, api_name='tongyong'):
api_url = API_URL[api_name]
request_url = api_url + "?access_token=" + getToken()
data = {'image': imgbase64}
result = word_recognition(request_url, data, HEADER)
print(result)
return result
def get_code(src,i):
# 获取base64码
imgbase64 = src.split(',')[1]
with open(f'{i}.jpg','wb') as fp:
fp.write(base64.b64decode(imgbase64))
print(f'第{i}次获取:')
# 通过百度orc,识别图片中的验证码
code = user_defined(imgbase64)
return code
def action_login_nocode(driver, elements):
loginButton, inputLogin, inputPassword, inputVerify, verifyImage = elements
inputLogin.clear()
inputPassword.clear()
inputVerify.clear()
actions = ActionChains(driver)
actions.move_to_element(inputLogin)
actions.send_keys_to_element(inputLogin, 'pengr@hb')
actions.move_to_element(inputPassword)
actions.send_keys_to_element(inputPassword, 'Xsdd119!')
actions.perform()
def action_login_code(driver, elements):
loginButton, inputLogin, inputPassword, inputVerify, verifyImage = elements
inputVerify.clear()
code = input('请输入验证码:')
time.sleep(1)
if code:
pass
else:
code = get_ver(verifyImage)
actions = ActionChains(driver)
actions.move_to_element(inputVerify)
actions.send_keys_to_element(inputVerify, code)
actions.click(loginButton) # 单击
actions.perform()
def word_recognition(request_url, data, header):
response = requests.post(request_url, data=data, headers=header)
print(response.json())
res = response.json()['words_result']
result = []
for i in res:
result.append(i['words'])
return result
def getToken():
host = f'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={GENERAL_API_KEY}&client_secret={GENERAL_SECRET_KEY}'
response = requests.get(host)
return response.json()['access_token']
def get_yanzhengma():
"""
api返回二进制的图片数据
@return: 返回编码为base64的图片数据
"""
post_data = {'verifyCodeId': '9C1Kz8tX1609379911275','skipAuthKey': 'skipAuthKey'}
yanzhengma_url = 'http://hb.ssj.119.gov.cn/api/mobileCommon/getVerifiCode'
res = requests.post(yanzhengma_url, headers=HEADER,data=post_data)
with open('2.jpg','wb') as fp:
fp.write(res.content)
# 可以通过PIL展示图片
return base64.b64encode(res.content)
def get_ver(verifyImage):
i = 1
while True:
src = verifyImage.get_property('src')
code = get_code(src,i)
# img = get_yanzhengma()
# code = user_defined(img)
if code:
print(f'原始识别:{code}')
code = ''.join(code[0].split())
if len(code) == 4:
print(f'第{i}次识别,识别成功,code为{code}。。')
return code
else:
print(f'第{i}次识别未能识别,重新识别。。')
i += 1
verifyImage.click()
time.sleep(1)
else:
print(f'第{i}次识别未能识别,重新识别。。')
i += 1
verifyImage.click()
time.sleep(1)
class ReuseChrome(Remote):
def __init__(self, command_executor, session_id):
self.r_session_id = session_id
Remote.__init__(self, command_executor=command_executor, desired_capabilities={})
def start_session(self, capabilities, browser_profile=None):
"""
重写start_session方法
"""
if not isinstance(capabilities, dict):
raise InvalidArgumentException("Capabilities must be a dictionary")
if browser_profile:
if "moz:firefoxOptions" in capabilities:
capabilities["moz:firefoxOptions"]["profile"] = browser_profile.encoded
else:
capabilities.update({'firefox_profile': browser_profile.encoded})
self.capabilities = options.Options().to_capabilities()
self.session_id = self.r_session_id
self.w3c = False
def __exit__(self, *args):
print('exit')
def get_web():
with open('1.json', 'r') as fp:
json_str = fp.read()
url_info = json.loads(json_str)
executor_url = url_info['executor_url']
session_id = url_info['session_id']
print(executor_url, session_id)
# 通过重写的remote,获取driver
driver = ReuseChrome(command_executor=executor_url, session_id=session_id)
driver.session_id = session_id
return driver
# 以下代码为登录之后的操作
def test():
with open('湖北省 “双随机、一公开”消防监管信息系统.html', 'r', encoding='utf-8', errors='ignore') as fp:
html = fp.read()
logging.info(html)
titles = re.findall(r'<div.*?class="van-cell__value"><span.*?>(.*?)</span></div>', html, re.DOTALL)
print(titles)
name = titles[0]
manager = titles[3]
phone = titles[4]
print(name, manager, phone)
def write_info(driver):
"""
输入name和phone
@param driver:
@return:
"""
html = driver.find_element_by_xpath("//*").get_attribute('outerHTML')
titles = re.findall(r'<div.*?class="van-cell__value"><span.*?>(.*?)</span></div>', html, re.DOTALL)
print(titles)
name = titles[0]
manager = titles[3]
phone = titles[4]
print(name, manager, phone)
info_inputs = driver.find_elements_by_xpath('//input[@class="van-field__control"]')
name_input = info_inputs[0]
phone_input = info_inputs[1]
# 清空name输入框中的文字,并输入name信息
name_input.click()
name_input.clear()
name_input.send_keys(manager)
# 清空phone输入框中的文字,并输入phone信息
phone_input.click()
phone_input.clear()
phone_input.send_keys(phone)
return name, phone, manager
def unhold(check_item):
# 展开选项
if check_item.find_elements_by_xpath('.//div[@class="van-collapse-item__wrapper"]'):
v = check_item.find_elements_by_xpath('.//div[@class="van-collapse-item__wrapper"]')
else:
print('点击元素')
check_item.click()
time.sleep(1)
v = check_item.find_elements_by_xpath('.//div[@class="van-collapse-item__wrapper"]')
return v
def general_search(driver, tag, v, key, value):
k = v[0].find_elements_by_xpath(f'.//{tag}[@placeholder="{key}"]')
if len(k) == 1:
k[0].click()
k[0].clear()
k[0].send_keys(value)
# 使用dom操作保存后,再进入时为空,只能使用鼠标模拟
# driver.execute_script(f"arguments[0].value = arguments[1];", k[0], value)
elif len(k) >= 2 and len(value) > 1:
for i, j in enumerate(k):
j.click()
j.clear()
j.send_keys(value[i])
# driver.execute_script(f"arguments[0].value = arguments[1];", j, value[i])
else:
print(f'未找到该元素包含{key}的input')
def select_yes_no(driver, radio_group, flag):
"""
选择有还是无
@param driver:
@param radio_group:
@param flag: 为True时,选择有,为False时,选择无
@return:
"""
radio_group = radio_group[0].find_elements_by_xpath(
'.//div[contains(@class,"van-radio__icon van-radio__icon--round")]')
radio_yes = radio_group[0]
radio_no = radio_group[1]
if flag:
driver.execute_script("""
arguments[0].classList.add('van-radio__icon--checked')
arguments[1].classList.remove('van-radio__icon--checked');
"""
, radio_yes, radio_no)
radio_yes.click()
else:
driver.execute_script("""
arguments[0].classList.add('van-radio__icon--checked')
arguments[1].classList.remove('van-radio__icon--checked');
"""
, radio_no, radio_yes)
radio_no.click()
def xuke(driver,v, name):
general_search(driver, 'input', v, '请输入被查建筑物名称', name)
general_search(driver, 'input', v, '请输入其他情况', '现场未提供')
def guanli(driver,v):
general_search(driver, 'input', v, '请输入其他情况', '未见异常')
def fanghuo(driver,v):
general_search(driver, 'input', v, '请输入抽查部位', ['/', '/'])
general_search(driver, 'textarea', v, '请输入检查情况', '未见异常')
def susan(driver,v):
general_search(driver, 'input', v, '请输入抽查部位', ['该单位', '该单位', '该单位', '该单位'])
general_search(driver, 'textarea', v, '请输入检查情况', '未见异常')
def find_radio_section(v):
section = v[0].find_elements_by_xpath('.//section[@class="van-doc-demo-block"]')
section1 = section[0]
try:
section2 = section[1]
except:
section2 = None
else:
section2 = section[1]
return section1, section2
def radio_select(v, driver, flag=False):
section1, section2 = find_radio_section(v)
# section1为必选,先执行
van_cell_title, radio_group = list(get_section_info(section1))[0]
select_yes_no(driver, radio_group, flag)
time.sleep(1.5)
section1, section2 = find_radio_section(v)
# section1为必选,先执行
van_cell_title, radio_group = list(get_section_info(section1))[0]
select_yes_no(driver, radio_group, flag)
return van_cell_title, radio_group, section2
def get_section_info(section):
van_cells = section.find_elements_by_xpath('.//div[@class="van-cell"]')
for van_cell in van_cells:
van_cell_title = van_cell.find_elements_by_xpath('.//div[@class="van-cell__title"]')[0]
van_cell_value = van_cell.find_elements_by_xpath('.//div[@class="van-cell__value"]')
radio_group = van_cell_value[0].find_elements_by_xpath('.//div[@class="van-radio-group"]')
yield van_cell_title, radio_group
def general_radios(v, driver, flag):
van_cell_title, radio_group_section1, section2 = radio_select(v, driver, flag)
if flag:
for title, radio_group in get_section_info(section2):
radio_group_section2 = radio_group
select_yes_no(driver, radio_group_section2, True)
return van_cell_title
def kongzhishi(v, driver, flag):
van_cell_title = general_radios(v, driver, flag)
if flag:
general_search(driver, 'input', v, '请输入在岗人数', '1')
general_search(driver, 'input', v, '请输入抽查部位', '/')
general_search(driver, 'textarea', v, '请输入检查情况', '未见异常')
else:
print(f'该单位没有此项{van_cell_title}')
def baojing(v, driver, flag):
van_cell_title = general_radios(v, driver, flag)
if flag:
general_search(driver, 'input', v, '请输入抽查部位及数量', '该单位,2')
general_search(driver, 'input', v, '请输入抽查部位', ['该单位', '控制室', '/'])
general_search(driver, 'input', v, '请输入其他设施', '/')
general_search(driver, 'textarea', v, '请输入检查情况', '未见异常')
else:
print(f'该单位没有此项{van_cell_title}')
def geishui(v, driver, flag):
van_cell_title = general_radios(v, driver, flag)
if flag:
general_search(driver, 'input', v, '请输入抽查部位', ['水泵房', '屋顶', '水泵房', '该单位', '该单位', '该单位', '该单位', '/'])
general_search(driver, 'input', v, '请输入其他设施', '/')
general_search(driver, 'textarea', v, '请输入检查情况', '未见异常')
else:
print(f'该单位没有此项{van_cell_title}')
def penshui(v, driver, flag):
van_cell_title = general_radios(v, driver, flag)
# section = v[0].find_elements_by_xpath('.//section[@class="van-doc-demo-block"]')
# section1 = section[0]
# van_cell_title, radio_group = list(get_section_info(section1))[0]
# select_yes_no(driver, radio_group, flag)
#
# section3 = section[2]
# van_cell_title, radio_group = list(get_section_info(section3))[0]
# select_yes_no(driver, radio_group, False)
if flag:
general_search(driver, 'input', v, '请输入抽查部位', ['水泵房', '该单位', '/'])
general_search(driver, 'input', v, '请输入其他设施', '/')
general_search(driver, 'input', v, '请输入末端试水装置压力值', '0.3Mpa')
general_search(driver, 'textarea', v, '请输入检查情况', '未见异常')
def qita(v, driver, flag):
section = v[0].find_elements_by_xpath('.//section[@class="van-doc-demo-block"]')
section1 = section[0]
for title, radio_group in get_section_info(section1):
radio_group_section2 = radio_group
select_yes_no(driver, radio_group_section2, True)
general_search(driver, 'input', v, '请输入抽查部位', ['该单位', '该单位', '该单位', '/'])
general_search(driver, 'input', v, '请输入抽查部位及数量', '该单位,4')
general_search(driver, 'input', v, '请输入其他设施', '/')
general_search(driver, 'textarea', v, '请输入检查情况', '未见异常')
def last_qita(v, driver, flag):
section = v[0].find_elements_by_xpath('.//section[@class="van-doc-demo-block"]')
section1 = section[0]
van_groups = section1.find_elements_by_xpath('.//div[@class="van-radio-group"]')
general_search(driver, 'textarea', v, '请输入其他情况', '/')
for radio_group in van_groups:
radio_group = radio_group.find_elements_by_xpath(
'.//div[contains(@class,"van-radio__icon van-radio__icon--round")]')
radio_nocare = radio_group[0]
radio_yes = radio_group[1]
radio_no = radio_group[2]
if flag:
driver.execute_script("""
arguments[0].classList.add('van-radio__icon--checked')
arguments[1].classList.remove('van-radio__icon--checked')
arguments[2].classList.remove('van-radio__icon--checked');
"""
, radio_nocare, radio_yes, radio_no)
def click_all(driver, name):
"""
展开所有未展开的选项
@param driver:
@return:
"""
# 获取每一项的div
check_items = driver.find_elements_by_xpath('//div[@class="van-collapse-item"]')
check_items2 = driver.find_elements_by_xpath('//div[@class="van-collapse-item van-hairline--top"]')
check_items = check_items + check_items2
print(len(check_items))
check_arrs = ['消防许可及验收备案', '消防安全管理', '建筑防火', '安全疏散', '消防控制室',
'消防设施器材-火灾自动报警系统', '消防设施器材-消防给水设施',
'消防设施器材-自动灭火系统', '消防设施器材-其它设施器材', '其它消防安全管理']
for i, check_item in enumerate(check_items):
# 展开选项,并获得展开后的元素 ".van-collapse-item__wrapper"
# 查找子元素时,需要加.
v = unhold(check_item)
title = check_item.find_elements_by_xpath(
".//div[contains(@class,'van-cell van-cell--clickable van-collapse-item__title')]")
if title:
print(f'操作当前板块:{title[0].text}')
if title[0].text == check_arrs[0]:
xuke(driver,v, name)
elif title[0].text == check_arrs[1]:
guanli(driver,v)
elif title[0].text == check_arrs[2]:
fanghuo(driver,v)
elif title[0].text == check_arrs[3]:
susan(driver,v)
elif title[0].text == check_arrs[4]:
kongzhishi(v, driver, True)
elif title[0].text == check_arrs[5]:
baojing(v, driver, True)
elif title[0].text == check_arrs[6]:
geishui(v, driver, True)
elif title[0].text == check_arrs[7]:
penshui(v, driver, True)
elif title[0].text == check_arrs[8]:
qita(v, driver, True)
elif title[0].text == check_arrs[9]:
last_qita(v, driver, True)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。