# sparrow_cloud **Repository Path**: sparrow614/sparrow_cloud ## Basic Information - **Project Name**: sparrow_cloud - **Description**: 基于k8s的微服务框架 - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 5 - **Created**: 2021-10-14 - **Last Updated**: 2025-06-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## sparrow cloud 组件介绍 ## ### Django SDK * RestClient : 封装了request包和服务发现,正确请求返回解析后json数据, 错误请求返回HTTPException * RequestsClient : 封装了request包和服务发现, 返回原生的request结果 * Message_Client : 将任务发送到rabbitmq, server端未开源 * Rabbitmq_Consumer : rabbitmq消息消费端,server端未开源 * Table_API : 接收查询条件返回 django model 序列化后的数据 * Api Schema Register : django subcommand, 主动注册API 描述到文档服务, server端未开源 * access_control verify : 访问控制验证,服务端未开源 * get_user_token : 获取用户token * get_app_token : 获取app token * ly_msg : 发送app 消息 * distributed_lock : 添加分布式锁或移除锁 ### Django Middleware ### * JWTMiddleware : 解析 JWT Token * MethodConvertMiddleware : 兼容不支持 put/delete 请求 * ExceptionMiddleware : 异常通知 * TracingMiddleware : 追踪链 * CheckLockMiddleware : 前端防重复提交锁中间件 ### rest_framework 中间件 ### * UserID Authentication: 验证 user ## sparrow cloud组件 ## [RestClient](#restclient) [RequestsClient](#requestsclient) [Message_Client](#message_client) [Rabbitmq_Consumer](#rabbitmq_consumer) [Table_API](#table_api) [Api Schema Register](#api-schema-register) [access_control_verify](#access_control_verify) [access_control_register](#access_control_register) [get_user_token](#get_user_token) [get_app_token](#get_app_token) [app_message](#app_message) [distributed_lock](#distributed_lock) ## LogFilter [log_filter](#log_filter) ## django中间件 ## [JWTMiddleware](#jwtmiddleware) [MethodConvertMiddleware](#method_middleware) [ExceptionMiddleware](#exceptionmiddleware) [TracingMiddleware](#tracingmiddleware) [CheckLockMiddleware](#CheckLockMiddleware) [LogMiddleware](#LogMiddleware) ## rest_framework中间件 ## [UserID Authentication](#useridauthentication) ## installation ## pip3.11 install sparrowcloud ## 测试运行 ## 运行所有测试: source tests/mock_configmap.sh && python3.11 -m unittest tests/test* access_control/test* 运行单个测试: source tests/mock_configmap.sh && python3.11 -m unittest tests/test_rest_client.py ## 运行环境 ## sparrowcloud 5.x.x 基于django4.2.x, python3.11 sparrowcloud 4.x.x 基于django3.2.x, python3.7 ## JWTMiddleware > 描述:Token 解析 > 注意:配置SC_JWT_PUBLIC_KEY环境变量,rsa公钥文件数据 > 配置 JWTMiddleware 中间件需要的参数 ``` python # 注册中间件 MIDDLEWARE = ( 'sparrow_cloud.middleware.jwt_middleware.JWTMiddleware', # 放最上层 ``` ## UserIDAuthentication > 描述: user_id 解析 > 配置 UserIDAuthentication 认证需要的参数(仅兼容django2.2以上版本) ``` python SPARROW_AUTHENTICATION = { "USER_CLASS_PATH": "sparrow_cloud.auth.user.User", } # 参数说明: USER_CLASS_PATH: 路径中的User为中间件的User模版, 可以根据自己的需求重新创建User, 并将自己的 User路径按照模版格式放到:USER_CLASS_PATH下 # 注册中间件 REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': ( 'sparrow_cloud.auth.user_id_authentication.UserIDAuthentication', ), } ``` ## METHOD_MIDDLEWARE > 兼容阿里不支持 put/delete 请求 > 配置METHOD_MIDDLEWARE需要的参数 ``` python # 注册 METHOD_MIDDLEWARE MIDDLEWARE_CLASSES = ( 'sparrow_cloud.middleware.methodconvert.MethodConvertMiddleware', #兼容阿里请求方式中间件 ) ``` ## TracingMiddleware > 兼容阿里不支持 put/delete 请求 > 配置METHOD_MIDDLEWARE需要的参数 ``` python MIDDLEWARE_CLASSES = ( 'sparrow_cloud.middleware.tracing_middleware.TracingMiddleware', # 追踪链 ) ``` ## CheckLockMiddleware > 防前端重复提交中间件,需要前端配合使用 > 对请求header中的特定键值判断其状态,来决定是否阻止该次请求 ```python MIDDLEWARE_CLASSES = ( 'sparrow_cloud.middleware.lock_middleware.CheckLockMiddleware', # 防前端重复提交 ) ``` ## restclient > 微服务间调用,使用示例 ``` python from sparrow_cloud.restclient import rest_client rest_client.post(service_address, api_path, timeout=30, token=None, json=api_list) ``` ### 参数说明 ``` service_address = "test-svc:8000" timeout: 非必传,默认超时时间30秒 传参方式: timeout=30 # 30秒为connect 和 read 的 timeout timeout=(5, 5) # 分别定制:connect 和 read 的 timeout timeout=None # Request 永远等待 token: 内部跨服务调用认证token from sparrow_cloud.authorization.token import get_app_token, get_user_token token = get_user_token(user_id="21424kvjbcdjslafds") 或者 token = get_app_token() 其它: 剩余参数与 requests.get/post 等方法保持一致 ``` ### 异常返回 ``` 当服务间调用异常时,抛出异常 HTTPException ``` ## requestsclient > 服务调用(返回结果未封装) ``` python from sparrow_cloud.restclient import requests_client requests_client.post(service_address, api_path, timeout=30, token=None, json=api_list) ``` ### 参数说明 ``` service_address = "test-svc:8000" timeout: 非必传,默认超时时间30秒 传参方式: timeout=30 # 30秒为connect 和 read 的 timeout timeout=(5, 5) # 分别定制:connect 和 read 的 timeout timeout=None # Request 永远等待 token: 内部跨服务调用认证token from sparrow_cloud.authorization.token import get_app_token, get_user_token token = get_user_token(user_id="21424kvjbcdjslafds") 或者 token = get_app_token() 其它: 剩余参数与 requests.get/post 等方法保持一致 ``` ## message_client > 麻雀任务发送 > 1. 注册消息 2. 发送消息 ``` python # 调用方式 from sparrow_cloud.message_service.sender import send_task_v3 # 发送消息 data = send_task_v3(message_code=message_code, *args, **kwargs) # 发送延时消息 data = send_task_v3( message_code=message_code, # message_code: 消息码 delay_time=200, # delay_time: 延时时间,单位为秒 *args, **kwargs) =====================以下为旧版调用方式,不建议继续使用,会逐步弃用=========================== # 调用方式 from sparrow_cloud.message_service.sender import send_task # 发送消息 data = send_task( exchange=exchange, # 交换机 routing_key=routing_key, # 路由 message_code=message_code, # 消息码 retry_times=3, # 重试次数,非必填,默认重试次数为3次(每次间隔1秒) *args, **kwargs) # 发送延时消息 data = send_task( exchange=exchange, # 交换机 routing_key=routing_key, # 路由 message_code=message_code, # 消息码 retry_times=3, # 重试次数,非必填,默认重试次数为3次(每次间隔1秒) delay=True, # 发送延时消息,默认为False,表示立即发送。如果设为True,则根据delay_time来设定延时时间 delay_time=200, # 延时时间,单位为秒 *args, **kwargs) ``` ## rabbitmq_consumer > 麻雀任务消费 > 1. 获取队列 2. 消费任务 ``` python # 1. settings 注册服务 INSTALLED_APPS = [ "sparrow_cloud.apps.message_service", ] # 2. settings 配置消费者对列 QUEUE_CONF_1 = { # 队列的配置,QUEUE_CONF_1可自定义名称 "QUEUE": "", # 队列名称 "TARGET_FUNC_MAP": { # 队列消费任务(字典中的键为message code,对应的值为执行该消息的任务函数路径字符串) "message_code": "path", # message_code: 消息代码。path: 消费消息的函数路径,如: django模块名称.文件名.函数方法名 }, } # 消费者启动命令: python3.11 manage.py rabbitmq_consumer --queue QUEUE_CONF_1 # --queue: 指定发送队列配置名称,settings QUEUE_CONF_1的名称 ``` ## table_api > 接受查询条件返回django model 序列化后的数据 > 分为server端和client端 ``` python # server 端配置 # settings注册服务 INSTALLED_APPS = [ "sparrow_cloud.apps.table_api", ] # url配置 urlpatterns = [ path('table/api/', include("sparrow_cloud.apps.table_api.urls")), ] # client端调用 from sparrow_cloud.restclient import rest_client service_address = "sparrow-demo:8000" payload = { "app_lable_model":"app_lable.model", "filter_condition":{"product_id":"74101"} } response = rest_client.get(service_address, api_path='/table/api/', json=payload) # 返回的数据结构:{'code': 0, 'message': 'ok', 'data': [{}]} # ps: # app_lable_model: app_name.model(app_name:INSTALLED_APPS里面注册的服务的名字, model:app_lable下的model名字,不区分大小写) # filter_condition: 过滤数据, kwargs # server端使用orm filter查询数据,当前版本不支持order_by ``` ## API SCHEMA REGISTER >描述:主动注册API 描述到文档服务 配置schema_command 需要的参数 ``` python # settings 配置: INSTALLED_APPS = [ "sparrow_cloud.apps.schema_command", # 注册schema_command到INSTALLED_APPS中 ] # schema_command依赖配置 SERVICE_CONF = { "NAME": "", # 使用sparrowcloud的服务名称 "SECRET": "" } # 调用方式: python3 manage.py register_api_schema # 使用说明: # 1、view支持@api_view注解方式,view_class支持GenericApiView,GenericViewSet及其子类 # 2、接口描述书写在view函数或者view_class的__doc__上,建议使用markdown格式,展示更美观 ``` >接口描述代码示例 ```python from rest_framework.decorators import api_view from rest_framework.generics import RetrieveUpdateDestroyAPIView from rest_framework import generics from rest_framework.viewsets import ModelViewSet @api_view(('GET',)) def get_user(request): """ ### 获取用户信息 #### 请求参数 id, 用户id 返回 { "user_id":"1", # 用户ID "user_name":"Tom" # 用户名称 } """ class UserApiView(RetrieveUpdateDestroyAPIView, generics.GenericAPIView): """ get: ### 查询用户信息 ### 请求参数 id, 用户id 返回 { "id":"1", # 用户ID "user_name":"Tom" # 用户名称 } delete: ### 删除用户 ### 路径参数 id 用户id 返回 404 用户id不存在 204 删除成功 """ def put(self, request, *args, **kwargs): """ ### 覆盖修改用户 ### 请求参数 { "id":"1", # 用户ID "user_name":"Tom" # 用户名称 } 返回 200 修改成功 """ return super(UserApiView, self).put(self, request, *args, **kwargs) class CarViewSet(ModelViewSet): """ list: 分页查询车辆 retrieve:获取车辆信息 update: 覆盖修改车辆 partial_update: 部分修改车辆 create: 创建车辆 destroy: 删除车辆 """ ``` ## ExceptionMiddleware > 中间件 > 说明: 可自动捕获程序异常,并将详细错误信息发送到飞书 ``` python # settings 配置 MIDDLEWARE = [ "sparrow_cloud.middleware.exception.ExceptionMiddleware" ] ``` ## LogMiddleware > 日志中间件 > 日志中自动打印请求的request_id。日志的request_id与链路追踪中的request_id一致,可结合链路追踪中搜索快速定位异常请求信息。 > 此中间件需要配合log_filter 使用,才能在每条业务逻辑日志中插入request_id, 如单独使用只能透request_id ``` python # settings 配置 MIDDLEWARE = [ "sparrow_cloud.middleware.log_middleware.RequestIDMiddleware" ] ``` ## log_filter > log_filter(自定义logfilter,从线程中获取request_id,从每条日志中打印出来) > 需要与LogMiddleware 中间件配合使用,否则不生效 ``` python # log_settings 配置 LOGGING = { 'version': 1, 'disable_existing_loggers': False, 'filters': { # 新增的配置 'request_id': { '()': 'sparrow_cloud.filter.log_filters.RequestIDFilter' } }, 'handlers': { 'console': { 'level': 'DEBUG', 'class': 'logging.StreamHandler', 'formatter': 'standard', # 需要修改的的配置 'filters': ['request_id'], # 增加的 log_filter 的配置 }, # 修改后的业务日志配置示例 'access_control': { 'class': 'logging.handlers.RotatingFileHandler', 'filters': ['request_id'], # 增加的配置 'level': 'DEBUG', 'formatter': 'standard', # 修改为新增的formatters 配置 'filename': os.path.join(os.path.sep, BASE_DIR_LOG, 'access_control.log'), 'mode': 'a', 'maxBytes': 1024 * 1024 * 5, 'backupCount': 1, # 修改 }, }, 'formatters': { # 增加的配置 'standard': { 'format': '%(levelname)s %(asctime)s %(request_id)s %(lineno)d %(name)s %(module)s %(funcName)s %(process)d %(thread)d %(message)s' }, }, # sparrow_cloud 的日志 'sparrow_cloud': { 'handlers': ['console', 'sparrow_cloud'], 'level': 'DEBUG', 'propagate': False, }, # 业务日志配置 'access_control': { 'handlers': ['console', 'access_control'], 'level': 'DEBUG', 'propagate': False, } } } ``` ## ACCESS_CONTROL_VERIFY > access_control_verify decorators (访问控制验证) ``` python # settings 配置 SERVICE_CONF = { "NAME": "", # value为本服务的注册名称 "SECRET": "", } # 访问控制client端settings配置 # SC_SKIP_ACCESS_CONTROL: 是否跳过访问控制, True:跳过, false:不跳过 # 使用方式 # 函数视图使用方式示例 from sparrow_cloud.access_control.decorators import access_control_fbv @api_view(('POST', 'GET', 'PUT', 'DELETE')) @access_control_fbv("permission_example1") # 位置放到最下层 def test(request, *args, **kwargs): return Response({"message": "ok"}, status=status.HTTP_200_OK) # 类视图使用方式(全部方法都验证) from sparrow_cloud.access_control.decorators import access_control_cbv_all @access_control_cbv_all("permission_example1") class ProductOperationList(generics.ListCreateAPIView): """请求方法:GET/POST""" pass # 类视图使用方式(根据method验证) from sparrow_cloud.access_control.decorators import access_control_cbv_method RESOURCE = { "post": "permission_example1", "get": "permission_example2" } @access_control_cbv_method(RESOURCE) class ProductOperationList(generics.ListCreateAPIView): """请求方法:GET/POST""" pass ``` ## get_user_token > get_user_token (获取用户token) ```python # 获取用户token from sparrow_cloud.authorization.token import get_user_token user_token = get_user_token(user_id="21424kvjbcdjslafds") ``` ## get_app_token > get_app_token (获取服务token) ```python # 获取app token from sparrow_cloud.authorization.token import get_app_token app_token = get_app_token() ``` ## app_message > app_message (发送消息到揽月app, 服务端未开源) ```python #目前支持的消息类型有:纯文本、图片,Markdown、文字card和图片card # 发送图片消息到app from sparrow_cloud.app_message.sender import send_message msg_data = "http://www.test.com/image.png" res = send_message(msg_data=msg_data, code_type="test", content_type="image", msg_sender="麻雀") # 发送文本消息到app from sparrow_cloud.app_message.sender import send_message msg_data = "文本消息" res = send_message(msg_data=msg_data, code_type="test", nickname="文本消息携带字段") # 发送文本card消息到app from sparrow_cloud.app_message.sender import send_message msg_data = "卡片消息内容" res = send_message(msg_data=msg_data, code_type="test", content_type="card_text", title="通知") ## 参数说明 ## msg_data: 消息主体。填充待发送的消息格式msg中的data数据 ## code_type: 申请的code ## content_type: 发送消息的类型,非必传,默认是text文本类型。目前支持"text","image","markdown","card_text","card_image". ## msg_sender: app中展示的发送消息服务的名称, 非必传,如不传取 service_name ## shop_id: 非必传,默认为None,根据自己的需求 ## user_id_list: 非必传,默认为空列表,根据自己的需求 ## nickname: 非必传,根据发送的消息类型content_type决定是否传递 ## title: 非必传,根据发送的消息类型content_type决定是否传递 ``` ## distributed_lock > add_lock (添加锁) > remove_lock (移除锁) ```python from sparrow_cloud.distributed_lock.lock_op import add_lock, remove_lock #添加锁 res = add_lock("lock_key", 100) if res.get("code") != 0: #加锁失败 ##添加锁参数说明 ## key: 加锁的key值 ## exexpire_time: 必须是int型,表示超时时间。如果在超过这个时间没有调用移除锁,则会自动释放锁。所以该时间需要根据业务层处理时间尽可能准确 ## 返回字典结构为{"code":0/-1, "message":"xxxx"}或者{"message":"xxx"} ## 如果没有code字段表示出错,需要查看message。 ## code为0表示加锁成功。其他数值表示加锁失败,停止执行之后的业务逻辑。 ## message表示具体的信息 # 移除锁 res = remove_lock("lock_key") if res.get("code") != 0: #移除锁失败 ##移除锁参数说明 ## key: 移除锁的key值 ## 返回字典结构为{"code":0/-1, "message":"xxxx"}或者{"message":"xxx"} ## 如果没有code字段表示出错,需要查看message。 ## code为0表示移除锁成功。其他数值表示移除锁失败 ## message表示具体的信息,“1”表示移除成功,“0”表示没有该key值。 ``` ## Stargazers over time [![Stargazers over time](https://starchart.cc/hanguangbaihuo/sparrow_cloud.svg)](https://starchart.cc/hanguangbaihuo/sparrow_cloud)