From 1dc8ce1e5c675354de988fafdbb416c241a934b6 Mon Sep 17 00:00:00 2001 From: general Date: Mon, 15 Apr 2019 15:38:02 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E6=AD=A3trans=E6=96=B9=E6=B3=95,?= =?UTF-8?q?=20=E5=AE=8C=E5=96=84=E9=93=BE=E6=8E=A5=E7=9A=84=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E8=BD=AC=E6=8D=A2=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cache_queue.py | 3 ++- utils.py | 10 ++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cache_queue.py b/cache_queue.py index 846fabe..d6d59b0 100644 --- a/cache_queue.py +++ b/cache_queue.py @@ -1,7 +1,8 @@ class CacheQueue(list): ''' 为了避免数据丢失, 需要定时将队列中的数据持久化到数据库. - 但是原生队列无法使用copy/deepcopy拷贝, 会报 + 但是原生队列无法使用copy/deepcopy拷贝, 会报TypeError: can't pickle _thread.lock objects. + 就算使用pickle模块也是这个问题, 无法实例化. 为了保证取出数据的同时不丢失数据, 需要先get再put回去, 太傻了. 这里用list模拟简单队列. ''' diff --git a/utils.py b/utils.py index 157149a..3bdfcdc 100644 --- a/utils.py +++ b/utils.py @@ -87,7 +87,7 @@ def trans_to_local_link(url, is_page = True): origin_query = urlObj.query local_path = origin_path - # url除去最后的/ + if local_path == "": local_path = 'index.html' if local_path.endswith('/'): local_path += 'index.html' if origin_query != '': @@ -99,17 +99,15 @@ def trans_to_local_link(url, is_page = True): if is_page and not local_path.endswith('.html') and not local_path.endswith('.htm'): local_path += '.html' - ## 如果该url就是这个站点域名下的,那么无需新建域名目录存放 + ## 如果该url就是当前站点域名下的,那么无需新建域名目录存放. ## 如果是其他站点的(需要事先开启允许下载其他站点的配置), ## 则要将资源存放在以站点域名为名的目录下, 路径中仍然需要保留域名部分. ## 有时host中可能包含冒号, 需要转义. if origin_host != main_site: - local_path = origin_host.replace(':', special_chars[':']) + local_path + local_path = '/' + origin_host.replace(':', special_chars[':']) + local_path ## url中可能包含中文, 需要解码. local_path = unquote(local_path) - - if origin_host != main_site: local_path = '/' + local_path return local_path def trans_to_local_path(url, is_page = True): @@ -120,7 +118,7 @@ def trans_to_local_path(url, is_page = True): ''' local_link = trans_to_local_link(url, is_page) ## 如果是站外资源, local_link可能为/www.xxx.com/static/x.jpg, - ## 但我们需要的存储目录是相对路径, 所以需要事先将 + ## 但我们需要的存储目录是相对路径, 所以需要事先将链接起始的/移除 if local_link.startswith('/'): local_link = local_link[1:] file_dir = os.path.dirname(local_link) file_name = os.path.basename(local_link) -- Gitee From d3931f5023469dbaecb893a3fcf0b6c35d9655f4 Mon Sep 17 00:00:00 2001 From: general Date: Tue, 16 Apr 2019 01:37:15 +0800 Subject: [PATCH 2/2] =?UTF-8?q?task=E5=AD=97=E5=85=B8=E5=AF=B9=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- crawler.py | 124 +++++++++++++++++++++++++-------------------- db.py | 22 ++++---- docker-compose.yml | 2 +- page_parser.py | 75 +++++++++++++++++---------- settings.py | 8 +-- task.py | 3 ++ transform.py | 79 +++++++++++++++++++++++++++++ utils.py | 83 +++++++----------------------- worker_pool.py | 25 +++++---- 10 files changed, 251 insertions(+), 172 deletions(-) create mode 100644 task.py create mode 100644 transform.py diff --git a/.gitignore b/.gitignore index 5dff451..cc14c0a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -output/ +sites/ logs/ *.pyc .idea diff --git a/crawler.py b/crawler.py index 3d19b69..1e5d416 100644 --- a/crawler.py +++ b/crawler.py @@ -10,9 +10,10 @@ from pyquery import PyQuery from settings import outsite_asset, doc_pool_max, res_pool_max, main_url, max_depth, max_retry_times, site_db from page_parser import get_page_charset, parse_linking_pages, parse_linking_assets, parse_css_file -from utils import logger, empty_link_pattern, request_get_async, save_file_async, trans_to_local_link, trans_to_local_path +from utils import logger, empty_link_pattern, request_get_async, save_file_async +from transform import trans_to_local_path from worker_pool import WorkerPool -from db import init_db, query_url_record, add_url_record, query_page_tasks, query_asset_tasks, save_page_task, save_asset_task, update_record_to_success +from db import init_db, query_url_record, add_url_record, query_page_tasks, query_asset_tasks, save_page_task, save_asset_task, update_record_status from cache_queue import CacheQueue class Crawler: @@ -25,34 +26,47 @@ class Crawler: ## 初始化数据文件, 创建表 self.db_conn = init_db(site_db) self.load_queue() - self.enqueue_page(main_url, '', 1) + main_task = { + 'url': main_url, + 'url_type': 'page', + 'refer': '', + 'depth': 1, + 'failed_times': 0, + } + self.enqueue_page(main_task) self.page_worker_pool = WorkerPool(self.page_queue, self.get_html_page, doc_pool_max, worker_type = 'page') self.asset_worker_pool = WorkerPool(self.asset_queue, self.get_static_asset, res_pool_max, worker_type = 'asset') def start(self): self.page_worker_pool.start() - logger.info('page worker pool complete') + logger.info('页面工作池启动') - def get_html_page(self, request_url, refer, depth, failed_times): + def get_html_page(self, task): ''' 抓取目标页面 ''' - msg = 'get_static_asset: request_url %s, refer %s, depth %d, failed_times %d' % (request_url, refer, depth, failed_times) - logger.debug(msg) - if 0 < max_depth and max_depth < depth: - logger.warning('目标url: %s 已超过最大深度' % request_url) + msg = 'get_static_asset(): task: {task:s}' + logger.debug(msg.format(task = str(task))) + if 0 < max_depth and max_depth < task['depth']: + msg = '已超过最大深度: task: {task:s}' + logger.warning(msg.format(task = str(task))) return - if failed_times > max_retry_times: - logger.warning('目标url: %s 失败次数过多, 不再重试' % request_url) + if task['failed_times'] > max_retry_times: + msg = '失败次数过多, 不再重试: task: {task:s}' + logger.warning(msg.format(task = str(task))) return - code, resp = request_get_async(request_url, refer) + code, resp = request_get_async(task) if not code: - logger.error('请求页面失败 %s, referer %s, 重新入队列 %s' % (request_url, refer, resp)) + msg = '请求页面失败, 重新入队列: task: {task:s}, err: {err:s}' + logger.error(msg.format(task = str(task), err = resp)) ## 出现异常, 则失败次数加1 ## 不需要调用enqueue(), 直接入队列. - self.page_queue.push((request_url, refer, depth, failed_times + 1)) + task['failed_times'] += 1 + self.page_queue.push(task) return + elif resp.status_code == 404: + update_record_status(self.db_conn, task['url'], 'failed') try: charset = get_page_charset(resp.content) @@ -61,67 +75,72 @@ class Crawler: ## 超过最大深度的页面不再抓取, 在入队列前就先判断. ## 但超过静态文件无所谓深度, 所以还是要抓取的. - if 0 < max_depth and max_depth < depth + 1: - logger.warning('当前页面: %s 已达到最大深度, 不再抓取新页面' % (request_url, )) + if 0 < max_depth and max_depth < task['depth'] + 1: + msg = '当前页面已达到最大深度, 不再抓取新页面: task {task:s}' + logger.warning(msg.format(task = str(task))) else: - parse_linking_pages(pq_selector, request_url, depth+1, callback = self.enqueue_page) - - parse_linking_assets(pq_selector, request_url, depth+1, callback = self.enqueue_asset) + parse_linking_pages(pq_selector, task, callback = self.enqueue_page) + parse_linking_assets(pq_selector, task, callback = self.enqueue_asset) ## 抓取此页面上的静态文件 - self.asset_worker_pool.start(page_url=request_url) + self.asset_worker_pool.start(task) byte_content = pq_selector.outer_html().encode('utf-8') - file_path, file_name = trans_to_local_path(request_url, True) + file_path, file_name = trans_to_local_path(task['url'], 'page') code, data = save_file_async(file_path, file_name, byte_content) - if code: self.set_record_to_success(request_url) + if code: update_record_status(self.db_conn, task['url'], 'success') except Exception as err: - logger.error('parse page failed for %s refer %s: %s' % (request_url, refer, err)) + msg = '保存页面文件失败: task: {task:s}, err: {err:s}' + logger.error(msg.format(task = str(task), err = err)) - def get_static_asset(self, request_url, refer, depth, failed_times): + def get_static_asset(self, task): ''' 请求静态文件, css/js/img等并存储. ''' - msg = 'get_static_asset: request_url %s, refer %s, depth %d, failed_times %d' % (request_url, refer, depth, failed_times) - logger.debug(msg) + msg = 'get_static_asset(): task: {task:s}' + logger.debug(msg.format(task = str(task))) ## 如果该链接已经超过了最大尝试次数, 则放弃 - if failed_times > max_retry_times: return + if task['failed_times'] > max_retry_times: return - code, resp = request_get_async(request_url, refer) + code, resp = request_get_async(task) if not code: - logger.error('请求静态资源失败 %s, 重新入队列' % (request_url,)) + msg = '请求静态资源失败, 重新入队列: task: {task:s}, err: {err:s}' + logger.error(msg.format(task = str(task), err = resp)) ## 出现异常, 则失败次数加1 - self.asset_queue.push((request_url, refer, depth, failed_times + 1)) + task['failed_times'] += 1 + self.asset_queue.push(task) return - + elif resp.status_code == 404: + update_record_status(self.db_conn, task['url'], 'failed') + try: content = resp.content if 'content-type' in resp.headers and 'text/css' in resp.headers['content-type']: - content = parse_css_file(resp.text, request_url, depth, callback = self.enqueue_asset) - file_path, file_name = trans_to_local_path(request_url, False) + content = parse_css_file(resp.text, task, callback = self.enqueue_asset) + file_path, file_name = trans_to_local_path(task['url'], 'asset') code, data = save_file_async(file_path, file_name, content) - if code: self.set_record_to_success(request_url) + if code: update_record_status(self.db_conn, task['url'], 'success') except Exception as err: - logger.error('parse static asset failed for %s in page %s: %s' % (request_url, refer, err)) + msg = '保存静态文件失败: task: {task:s}, err: {err:s}' + logger.error(msg.format(task = str(task), err = err)) - def enqueue_asset(self, url, refer, depth): + def enqueue_asset(self, task): ''' 如果该url已经添加入url_records记录, 就不再重新入队列. 已进入队列的url, 必定已经存在记录, 但不一定能成功下载. 每50个url入队列都将队列内容备份到数据库, 以免丢失. ''' - if query_url_record(self.db_conn, url): return - - self.asset_queue.push((url, refer, depth, 0)) - add_url_record(self.db_conn, url, refer, depth, 'asset') + if query_url_record(self.db_conn, task['url']): return + self.asset_queue.push(task) + add_url_record(self.db_conn, task) self.asset_counter += 1 if self.asset_counter >= 50: self.asset_counter = 0 self.save_queue() - def enqueue_page(self, url, refer, depth): - if query_url_record(self.db_conn, url): return - self.page_queue.push((url, refer, depth, 0)) - add_url_record(self.db_conn, url, refer, depth, 'page') + def enqueue_page(self, task): + if query_url_record(self.db_conn, task['url']): return + self.page_queue.push(task) + add_url_record(self.db_conn, task) self.page_counter += 1 if self.page_counter >= 50: self.page_counter = 0 @@ -131,12 +150,10 @@ class Crawler: logger.debug('初始化任务队列') page_tasks = query_page_tasks(self.db_conn) for task in page_tasks: - item = (task[0], task[1], int(task[2]), int(task[3])) - self.page_queue.push(item) + self.page_queue.push(task) asset_tasks = query_asset_tasks(self.db_conn) for task in asset_tasks: - item = (task[0], task[1], int(task[2]), int(task[3])) - self.asset_queue.push(item) + self.asset_queue.push(task) logger.debug('初始化任务队列完成') def save_queue(self): @@ -152,13 +169,13 @@ class Crawler: ## 将队列中的成员写入数据库作为备份 while True: if _tmp_page_queue.empty(): break - item = _tmp_page_queue.pop() - page_tasks.append(tuple(item)) + task = _tmp_page_queue.pop() + page_tasks.append(task) while True: if _tmp_asset_queue.empty(): break - item = _tmp_asset_queue.pop() - asset_tasks.append(tuple(item)) + task = _tmp_asset_queue.pop() + asset_tasks.append(task) if len(page_tasks) > 0: save_page_task(self.db_conn, page_tasks) @@ -166,9 +183,6 @@ class Crawler: save_asset_task(self.db_conn, asset_tasks) logger.debug('保存任务队列完成') - def set_record_to_success(self, url): - update_record_to_success(self.db_conn, url) - def stop(self): ''' 任务停止前存储队列以便之后继续 diff --git a/db.py b/db.py index 531433d..e30dea0 100644 --- a/db.py +++ b/db.py @@ -14,8 +14,8 @@ def init_db(db_file): url varchar(512) unique, -- 已抓取过的url(可以是页面, 可以是静态资源), 唯一, 作为索引键 refer varchar(512), depth int, - url_type varchar(50), -- page, asset两种类型 - success int default 0 + url_type varchar(50), -- page, asset 2种类型 + status varchar(50) default 'init' -- init, success, failed 3种类型 ) ''' cursor.execute(sql_str) @@ -28,7 +28,7 @@ def init_db(db_file): sql_str = ''' create table if not exists page_tasks( id integer primary key autoincrement, - url varchar(512) unique, -- '已抓取过的url(可以是页面, 可以是静态资源), 唯一, 作为索引键' + url varchar(512) unique, -- 已抓取过的url(可以是页面, 可以是静态资源), 唯一, 作为索引键 refer varchar(512), depth int, failed_times int @@ -41,7 +41,7 @@ def init_db(db_file): sql_str = ''' create table if not exists asset_tasks( id integer primary key autoincrement, - url varchar(512) unique, -- '已抓取过的url(可以是页面, 可以是静态资源), 唯一, 作为索引键' + url varchar(512) unique, -- 已抓取过的url(可以是页面, 可以是静态资源), 唯一, 作为索引键 refer varchar(512), depth int, failed_times int @@ -61,13 +61,13 @@ def query_url_record(db_conn, url): cursor.close() return row -def add_url_record(db_conn, url, refer, depth, url_type): +def add_url_record(db_conn, task): ''' return: 返回新插入行的id ''' sql_str = 'insert into url_records(url, refer, depth, url_type) values(?, ?, ?, ?)' cursor = db_conn.cursor() - cursor.execute(sql_str, (url, refer, depth, url_type, )) + cursor.execute(sql_str, (task['url'], task['refer'], task['depth'], task['url_type'], )) ## 获取新插入数据id的方法 last_id = cursor.lastrowid ## 默认关闭自动提交 @@ -89,11 +89,15 @@ def query_page_tasks(db_conn): def query_asset_tasks(db_conn): return query_tasks(db_conn, 'asset_tasks') -def update_record_to_success(db_conn, url): - sql_str = 'update url_records set success = 1 where url = ?' +def update_record_status(db_conn, url, status): + ''' + @param: url 目标记录url + @param: status 目标记录状态, 字符串. 可选值为: init, success, failed + ''' + sql_str = 'update url_records set status = ? where url = ?' cursor = db_conn.cursor() ## 单个元素的写法, 注意如果是元组形式, 必须为逗号结尾. - cursor.execute(sql_str, (url,)) + cursor.execute(sql_str, (status, url,)) db_conn.commit() cursor.close() diff --git a/docker-compose.yml b/docker-compose.yml index 0d5a463..f17f6c7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,4 +7,4 @@ services: - 8080:80 volumes: - ./nginx.conf:/etc/nginx/conf.d/main.conf - - ./output:/usr/share/nginx/html + - ./sites:/usr/share/nginx/html diff --git a/page_parser.py b/page_parser.py index 764e7b9..cb5fd39 100644 --- a/page_parser.py +++ b/page_parser.py @@ -4,7 +4,8 @@ from urllib.parse import urljoin, urlparse, urldefrag from pyquery import PyQuery from settings import outsite_asset -from utils import logger, empty_link_pattern, get_main_site, trans_to_local_link, url_filter +from utils import logger, empty_link_pattern, get_main_site, url_filter +from transform import trans_to_local_link def get_page_charset(page_content): ''' @@ -20,7 +21,7 @@ def get_page_charset(page_content): if meta2 is not None: charset = meta2 return charset -def parse_linking_pages(pq_selector, page_url, depth, callback = None): +def parse_linking_pages(pq_selector, task, callback = None): ''' 分别解析页面中的a, iframe等元素的链接属性, 得到http(s)://式的url, 并调用callback入队列. @@ -28,16 +29,16 @@ def parse_linking_pages(pq_selector, page_url, depth, callback = None): 不可以直接传递string类型的页面内容. ''' a_list = pq_selector('a') - _parse_linking_pages(a_list, page_url, 'href', depth, callback=callback) + _parse_linking_pages(a_list, 'href', task, callback=callback) -def _parse_linking_pages(element_list, origin_url, attr_name, depth, callback = None): +def _parse_linking_pages(node_list, attr_name, task, callback = None): ''' 处理页面中a标签, 将页面本身的url与a标签中的地址计算得到实际可访问的url, 然后加入队列. 同时修改原页面内容中a标签的链接属性值, 使得这些链接可指向下载到本地的html文件. ''' main_site = get_main_site() - for li in element_list: - url_attr = PyQuery(li).attr(attr_name) + for node_item in node_list: + url_attr = PyQuery(node_item).attr(attr_name) if url_attr is None or re.search(empty_link_pattern, url_attr): continue ## 拼接url并忽略url中的井号 @@ -47,11 +48,18 @@ def _parse_linking_pages(element_list, origin_url, attr_name, depth, callback = if not url_filter(full_url, url_type='page'): continue ## 重设链接地址为本地路径 - local_link = trans_to_local_link(full_url, True) - PyQuery(li).attr(attr_name, local_link) - if callback: callback(full_url, origin_url, depth) - -def parse_linking_assets(pq_selector, page_url, depth, callback = None): + local_link = trans_to_local_link(full_url, 'page') + PyQuery(node_item).attr(attr_name, local_link) + new_task = { + 'url': full_url, + 'url_type': 'page', + 'refer': task['url'], + 'depth': task['depth'] + 1, + 'failed_times': 0, + } + if callback: callback(new_task) + +def parse_linking_assets(pq_selector, task, callback = None): ''' 分别解析页面中的link, script, img等元素的链接属性, 得到http(s)://式的url, 并调用callback入队列. @@ -59,33 +67,40 @@ def parse_linking_assets(pq_selector, page_url, depth, callback = None): 不可以直接传递string类型的页面内容. ''' link_list = pq_selector('link') - _parse_linking_assets(link_list, page_url, 'href', depth, callback) + _parse_linking_assets(link_list, 'href', task, callback) script_list = pq_selector('script') - _parse_linking_assets(script_list, page_url, 'src', depth, callback) + _parse_linking_assets(script_list, 'src', task, callback) img_list = pq_selector('img') - _parse_linking_assets(img_list, page_url, 'src', depth, callback) + _parse_linking_assets(img_list, 'src', task, callback) -def _parse_linking_assets(element_list, origin_url, attr_name, depth, callback): +def _parse_linking_assets(node_list, attr_name, task, callback): main_site = get_main_site() - for li in element_list: - url_attr = PyQuery(li).attr(attr_name) + for node_item in node_list: + url_attr = PyQuery(node_item).attr(attr_name) if url_attr is None or re.search(empty_link_pattern, url_attr): continue ## 拼接url并忽略url中的井号 - full_url = urljoin(origin_url, url_attr) + full_url = urljoin(task['url'], url_attr) full_url = urldefrag(full_url).url ## 如果不满足过滤规则则跳过 if not url_filter(full_url, url_type='asset'): continue ## 重设链接地址为本地路径 - local_link = trans_to_local_link(full_url, False) - PyQuery(li).attr(attr_name, local_link) - if callback: callback(full_url, origin_url, depth) - -def parse_css_file(content, origin_url, depth, callback = None): + local_link = trans_to_local_link(full_url, 'asset') + PyQuery(node_item).attr(attr_name, local_link) + new_task = { + 'url': full_url, + 'url_type': 'asset', + 'refer': task['url'], + 'depth': task['depth'] + 1, + 'failed_times': 0, + } + if callback: callback(new_task) + +def parse_css_file(content, task, callback = None): ''' 处理css文件中对静态资源的引用, 将引用的静态资源加入队列, @@ -106,11 +121,17 @@ def parse_css_file(content, origin_url, depth, callback = None): or re.search(empty_link_pattern, match_url): continue - full_url = urljoin(origin_url, match_url) + full_url = urljoin(task['url'], match_url) ## 如果不满足过滤规则则跳过 if not url_filter(full_url, url_type='asset'): continue - local_link = trans_to_local_link(full_url, False) - - if callback: callback(full_url, origin_url, depth) + local_link = trans_to_local_link(full_url, 'asset') + new_task = { + 'url': full_url, + 'url_type': 'asset', + 'refer': task['url'], + 'depth': task['depth'] + 1, + 'failed_times': 0, + } + if callback: callback(new_task) content = content.replace(match_url, local_link) return content.encode('utf-8') diff --git a/settings.py b/settings.py index b925bb2..b3c1a32 100644 --- a/settings.py +++ b/settings.py @@ -1,7 +1,7 @@ import logging # 要爬取的网站url, 需要以http(s)开头 -main_url = 'https://m.heiyeba.com' +main_url = 'https://m.xieeda.com/' # 设置代理 # 代理格式: @@ -17,7 +17,7 @@ headers = { } # 输出站点文件的路径,最后要加 '/' -output_path = './output/' +site_path = './sites/' site_db = 'site.db' # 每次请求的最大超时时间 @@ -34,7 +34,7 @@ wait_time = [1, 3] # 爬取页面的深度, 从1开始计, 爬到第N层为止. # 1表示只抓取单页, 0表示无限制 -max_depth = 2 +max_depth = 1 # 请求出错最大重试次数(超时也算出错) max_retry_times = 5 @@ -49,7 +49,7 @@ logging_config = { outsite_asset = True no_js = True no_css = False -no_images = True +no_images = False no_fonts = False ## 黑名单, 列表类型. 规则格式为正则, 默认为空. black_list = [] diff --git a/task.py b/task.py new file mode 100644 index 0000000..6804dc2 --- /dev/null +++ b/task.py @@ -0,0 +1,3 @@ +class Task(): + def __init__(self, **kwargs): + pass \ No newline at end of file diff --git a/transform.py b/transform.py new file mode 100644 index 0000000..57654aa --- /dev/null +++ b/transform.py @@ -0,0 +1,79 @@ +import os +from urllib.parse import urlparse, unquote + +from utils import get_main_site, special_chars + +def trans_to_local_link_for_page(urlObj): + origin_path = urlObj.path + origin_query = urlObj.query + + local_link = origin_path + if local_link == "": local_link = 'index.html' + if local_link.endswith('/'): local_link += 'index.html' + if origin_query != '': + query_str = origin_query + for k, v in special_chars.items(): + if k in query_str: query_str = query_str.replace(k, v) + local_link = local_link + special_chars['?'] + query_str + if not local_link.endswith('.html') and not local_link.endswith('.htm'): + local_link += '.html' + return local_link + +def trans_to_local_link_for_asset(urlObj): + origin_path = urlObj.path + origin_query = urlObj.query + + local_link = origin_path + if local_link == "": local_link = 'index' + if local_link.endswith('/'): local_link += 'index' + if origin_query != '': + query_str = origin_query + for k, v in special_chars.items(): + if k in query_str: query_str = query_str.replace(k, v) + local_link = local_link + special_chars['?'] + query_str + return local_link + +def trans_to_local_link(url, url_type = 'page'): + ''' + @param + url: 待处理的url, 有时url为动态链接, 包含&, ?等特殊字符, 这种情况下需要对其进行编码. + is_page: 是否为页面, 包含.php, .asp等动态页面, 区别于常规静态文件. 我们需要根据这个参数判断是否需要对其加上.html后缀. + @return + local_link: 本地文件存储路径, 用于写入本地html文档中的link/script/img/a等标签的链接属性 + ''' + ## 对于域名为host的url, 资源存放目录为output根目录, 而不是域名文件夹. 默认不设置主host + main_site = get_main_site() + + urlObj = urlparse(url) + origin_host = urlObj.netloc + local_link = '' + if url_type == 'page': + local_link = trans_to_local_link_for_page(urlObj) + else: + local_link = trans_to_local_link_for_asset(urlObj) + + ## 如果该url就是当前站点域名下的,那么无需新建域名目录存放. + ## 如果是其他站点的(需要事先开启允许下载其他站点的配置), + ## 则要将资源存放在以站点域名为名的目录下, 路径中仍然需要保留域名部分. + ## 有时host中可能包含冒号, 需要转义. + if origin_host != main_site: + local_link = '/' + origin_host.replace(':', special_chars[':']) + local_link + + ## url中可能包含中文, 需要解码. + local_link = unquote(local_link) + return local_link + +def trans_to_local_path(url, url_type = 'page'): + ''' + @return + file_path: 目标文件的存储目录, 相对路径(不以/开头), 为""时, 表示当前目录 + file_name: 目标文件名称 + ''' + local_link = trans_to_local_link(url, url_type) + ## 如果是站外资源, local_link可能为/www.xxx.com/static/x.jpg, + ## 但我们需要的存储目录是相对路径, 所以需要事先将链接起始的/移除 + if local_link.startswith('/'): local_link = local_link[1:] + file_dir = os.path.dirname(local_link) + file_name = os.path.basename(local_link) + + return file_dir, file_name diff --git a/utils.py b/utils.py index 3bdfcdc..59ae1f2 100644 --- a/utils.py +++ b/utils.py @@ -1,12 +1,11 @@ import os import re -import hashlib import logging -from urllib.parse import urlparse, unquote +from urllib.parse import urlparse import requests -from settings import main_url, headers, proxies, output_path, logging_config, outsite_asset +from settings import main_url, headers, proxies, site_path, logging_config, outsite_asset from settings import no_js, no_css, no_images, no_fonts, black_list logging.basicConfig(**logging_config) @@ -36,28 +35,36 @@ def get_main_site(): main_site = urlparse(main_url).netloc return main_site -def request_get_async(url, refer): +def request_get_async(task): ''' 协程形式发起get请求 return: requests.get()的结果 ''' try: _headers = headers.copy() - _headers['Referer'] = refer.encode('utf-8') - resp = requests.get(url=url, verify=True, headers=_headers, proxies=proxies) + _headers['Referer'] = task['refer'].encode('utf-8') + request_options = { + 'url': task['url'], + 'verify': True, + 'headers': _headers, + 'proxies': proxies, + } + resp = requests.get(**request_options) return (1, resp) except requests.exceptions.ConnectionError as err: - logger.error('连接异常 %s : %s' % (url, err)) + msg = '连接异常: task: {task:s}, err: {err:s}' + logger.error(msg.format(task = str(task), err = err)) return (0, err) except Exception as err: - logger.error('请求失败 %s: %s' % (url, err)) + msg = '请求失败: task: {task:s}, err: {err:s}' + logger.error(msg.format(task = str(task), err = err)) return (0, err) def save_file_async(file_path, file_name, byte_content): ''' 写入文件, 事先创建目标目录 ''' - path = output_path + file_path + path = site_path + file_path if not path.endswith('/'): path = path + '/' if not os.path.exists(path): os.makedirs(path) @@ -67,64 +74,10 @@ def save_file_async(file_path, file_name, byte_content): file.close() return (1, None) except IOError as err: - logger.error('Save Error: %s, path: %s, name: %s' % (err, path, file_name)) + msg = '保存文件失败: path: {path:s}, file: {file:s}, err: {err:s}' + logger.error(msg.format(path = path, file = file_name, err = err)) return (0, err) -def trans_to_local_link(url, is_page = True): - ''' - @param - url: 待处理的url, 有时url为动态链接, 包含&, ?等特殊字符, 这种情况下需要对其进行编码. - is_page: 是否为页面, 包含.php, .asp等动态页面, 区别于常规静态文件. 我们需要根据这个参数判断是否需要对其加上.html后缀. - @return - local_path: 本地文件存储路径, 用于写入本地html文档中的link/script/img/a等标签的链接属性 - ''' - ## 对于域名为host的url, 资源存放目录为output根目录, 而不是域名文件夹. 默认不设置主host - main_site = get_main_site() - - urlObj = urlparse(url) - origin_host = urlObj.netloc - origin_path = urlObj.path - origin_query = urlObj.query - - local_path = origin_path - if local_path == "": local_path = 'index.html' - if local_path.endswith('/'): local_path += 'index.html' - - if origin_query != '': - query_str = origin_query - for k, v in special_chars.items(): - if k in query_str: query_str = query_str.replace(k, v) - local_path = local_path + special_chars['?'] + query_str - - if is_page and not local_path.endswith('.html') and not local_path.endswith('.htm'): - local_path += '.html' - - ## 如果该url就是当前站点域名下的,那么无需新建域名目录存放. - ## 如果是其他站点的(需要事先开启允许下载其他站点的配置), - ## 则要将资源存放在以站点域名为名的目录下, 路径中仍然需要保留域名部分. - ## 有时host中可能包含冒号, 需要转义. - if origin_host != main_site: - local_path = '/' + origin_host.replace(':', special_chars[':']) + local_path - - ## url中可能包含中文, 需要解码. - local_path = unquote(local_path) - return local_path - -def trans_to_local_path(url, is_page = True): - ''' - @return - file_path: 目标文件的存储目录, 相对路径(不以/开头), 为""时, 表示当前目录 - file_name: 目标文件名称 - ''' - local_link = trans_to_local_link(url, is_page) - ## 如果是站外资源, local_link可能为/www.xxx.com/static/x.jpg, - ## 但我们需要的存储目录是相对路径, 所以需要事先将链接起始的/移除 - if local_link.startswith('/'): local_link = local_link[1:] - file_dir = os.path.dirname(local_link) - file_name = os.path.basename(local_link) - - return file_dir, file_name - def url_filter(url, url_type = 'page'): ''' @function 这个函数对url比对所有设置的规则, 判断目标url是否可以抓取. diff --git a/worker_pool.py b/worker_pool.py index 5128a72..c1310b0 100644 --- a/worker_pool.py +++ b/worker_pool.py @@ -7,7 +7,7 @@ from gevent.pool import Pool from utils import logger class WorkerPool: - def __init__(self, queue, func = None, pool_max=100, worker_type = 'page'): + def __init__(self, queue, func = None, pool_max = 100, worker_type = 'page'): self.queue = queue self.worker = func self.exit_signal = False @@ -16,30 +16,35 @@ class WorkerPool: self.pool = Pool(pool_max) self.worker_type = worker_type - def start(self, page_url = ''): - if self.worker_type == 'asset' and page_url != '': - logger.debug('asset worker pool start for page: %s' % page_url) + def start(self, page_task = None): + if self.worker_type == 'asset': + msg = '静态资源工作池启动, 所属页面: {:s}' + logger.debug(msg.format(page_task['refer'])) while True: if self.exit_signal: break if not self.queue.empty(): - item = self.queue.pop() - logger.debug('从队列中取出成员: %s, 调用worker' % str(item)) - self.pool.spawn(self.worker, *item) + task = self.queue.pop() + msg = '从队列中取出成员, 调用worker. task: {task:s}' + logger.debug(msg.format(task = str(task))) + self.pool.spawn(self.worker, task) elif self.pool.free_count() != self.pool.size: ## 如果队列已空, 但是协程池还未全部空闲, 说明仍有任务在执行, 等待. free = self.pool.free_count() total = self.pool.size working = total - free - logger.debug('pool worker usage: %d/%d, page url: %s' % (working, total, page_url)) + if self.worker_type == 'asset': + msg = '工作池使用率: {working:d}/{total:d}, page_task: {page_task:s}' + logger.debug(msg.format(working = working, total = total, page_task = str(page_task))) sleep(1) elif self.exit_signal: ## 如果队列为空, 且各协程都已空闲, 或是触发了stop()方法, 则停止while循环 break else: break - if self.worker_type == 'asset' and page_url != '': - logger.debug('asset worker pool stop for page: %s' % page_url) + if self.worker_type == 'asset': + msg = '静态资源工作池结束, 所属页面: {:s}' + logger.debug(msg.format(page_task['refer'])) def stop(self): self.exit_signal = True -- Gitee