1 Star 0 Fork 7

Tcc1254698/WebBlog

forked from thanatosx/WebBlog 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
app.py 6.77 KB
一键复制 编辑 原始数据 按行查看 历史
mxxim 提交于 2015-07-03 10:49 +08:00 . 。。
from datetime import datetime
import json
import os
import time
import traceback
from FrontUtils import cookie2user
import dao
from config.config_util import config
from jinja2 import FileSystemLoader, Environment
from handlers import add_routes, add_static
import exception
__author__ = 'thanatos'
import logging;
logging.basicConfig(level=logging.INFO)
import asyncio
from aiohttp import web
@asyncio.coroutine
def init(loop):
yield from dao.create_pool(loop=loop, host=config.db.host, port=config.db.port,
user=config.db.user, password=config.db.password,
db=config.db.database)
app = web.Application(loop=loop, middlewares=[
exception_factory, logger_factory, response_factory, cookie_factory
])
init_jinja2(app, path='templates', filters={
'time_filter': time_filter
})
add_routes(app, 'controller')
add_static(app)
srv = yield from loop.create_server(app.make_handler(), config.server.host, config.server.port)
logging.info('server started at http://127.0.0.1:9000...')
return srv
def time_filter(t):
delta = int(time.time() - t)
if delta < 60:
return '1分钟前'
if delta < 3600:
return '%s分钟前' % (delta // 60)
if delta < 86400:
return '%s小时前' % (delta // 3600)
if delta < 604800:
return '%s天前' % (delta // 86400)
dt = datetime.fromtimestamp(t)
return '%s年%s月%s日' % (dt.year, dt.month, dt.day)
def init_jinja2(app, **kw):
options = dict(
autoescape=kw.get('autoescape', True),
block_start_string=kw.get('block_start_string', '{%'),
block_end_string=kw.get('block_end_string', '%}'),
variable_start_string=kw.get('variable_start_string', '{{'),
variable_end_string=kw.get('variable_end_string', '}}'),
auto_reload=kw.get('auto_reload', True)
)
path = kw.get('path', None)
if path is None:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
logging.info('set jinja2 template path: %s' % path)
env = Environment(loader=FileSystemLoader(path), **options)
filters = kw.get('filters', None)
if filters is not None:
for name, f in filters.items():
env.filters[name] = f
app['__templating__'] = env
@asyncio.coroutine
def admin_factory(app, handler):
@asyncio.coroutine
def admin_filter(request):
logging.info('-admin_filter--request path is %s' % request.path)
if request.path.startswith('/admin'):
user = request.__user__
if not user or not user.isAdmin:
raise exception.ForbidError('url is %s' % request.path)
return (yield from handler(request))
return admin_filter
@asyncio.coroutine
def logger_factory(app, handler):
@asyncio.coroutine
def logger(request):
logging.info('-logger filter--request: %s -- %s' % (request.method, request.path))
return (yield from handler(request))
return logger
@asyncio.coroutine
def exception_factory(app, handler):
@asyncio.coroutine
def exception_filter(request):
try:
return (yield from handler(request))
except BaseException as e:
logging.error('exception_filter catch %s' % e)
traceback.print_tb(e.__traceback__)
_json = {'code': 1}
_template = '404.html'
if isinstance(e, exception.ForbidError):
_json = {'code': 2}
if isinstance(e, exception.PrivilegeError):
_json = {'code': 3}
if request.content_type.lower().startswith('text/html'):
resp = web.Response(body=app['__templating__'].get_template(_template).render().encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
else:
resp = web.Response(
body=json.dumps(_json, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
return exception_filter
@asyncio.coroutine
def data_factory(app, handler):
@asyncio.coroutine
def parse_data(request):
logging.info('-data parse--')
if request.method == 'POST':
if request.content_type.startswith('application/json'):
request.__data__ = yield from request.json()
logging.info('request json: %s' % str(request.__data__))
elif request.content_type.startswith('application/x-www-form-urlencoded'):
request.__data__ = yield from request.post()
logging.info('request form: %s' % str(request.__data__))
return (yield from handler(request))
return parse_data
@asyncio.coroutine
def response_factory(app, handler):
@asyncio.coroutine
def response(request):
logging.info('-response filter--response handler...')
r = yield from handler(request)
logging.info('--resp--')
if isinstance(r, web.StreamResponse):
return r
if isinstance(r, bytes):
resp = web.Response(body=r)
resp.content_type = 'application/octet-stream'
return resp
if isinstance(r, str):
if r.startswith('redirect:'):
return web.HTTPFound(r[9:])
resp = web.Response(body=r.encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
if isinstance(r, dict):
template = r.get('__template__')
if template is None:
resp = web.Response(
body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__, allow_nan=False).encode('utf-8'))
resp.content_type = 'application/json;charset=utf-8'
return resp
else:
resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))
resp.content_type = 'text/html;charset=utf-8'
return resp
# default:
resp = web.Response(body=str(r).encode('utf-8'))
resp.content_type = 'text/plain;charset=utf-8'
return resp
return response
__COOKIE_NAME = config.cookie.name
@asyncio.coroutine
def cookie_factory(app, handler):
@asyncio.coroutine
def cookie(request):
logging.info('-auth filter--check user: %s %s' % (request.method, request.path))
cookie_str = request.cookies.get(__COOKIE_NAME)
if cookie_str:
user = yield from cookie2user(cookie_str)
request.__user__ = user
else:
request.__user__ = None
return (yield from handler(request))
return cookie
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/tcc1254698/WebBlog.git
git@gitee.com:tcc1254698/WebBlog.git
tcc1254698
WebBlog
WebBlog
master

搜索帮助