# streaming_ratings **Repository Path**: FormatFa/streaming_ratings ## Basic Information - **Project Name**: streaming_ratings - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2019-11-26 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## SparkStreaming 实时统计评分数量Top 10的电影 - 数据源 通过读取本地评分文件(ratings.csv)发送给SparkStreaming接收,持续几分钟,模拟不断有人评分 通过写入到指定端口的socket来模拟 - SparkStreaming处理 streaming程序使用`socketTextStream`监听对应端口,得到数据,将评分数量累计的Top 15 保存到mysql表中 - 后端前端 flask 后端在指定路由 读取 mysql,返回数据,前端不断请求Top15的路由,得到结果显示在浏览器上 数据集来源: https://grouplens.org/datasets/movielens/ 数据格式: - `movies.csv` ``` movieId,title,genres 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy 2,Jumanji (1995),Adventure|Children|Fantasy ``` 电影id,标题,类型 - `ratings.csv` ``` userId,movieId,rating,timestamp 1,1,4.0,964982703 ``` 用户id,电影id,评分,时间戳 - 流程(1.0版) 模拟产生数据->socket -> SparkStreaming -> (MySQL或者Redis)->Flask 后台+前端 ajax显示结果(轮询的方式) - 流程(2.0版) 模拟产生数据->Kafka 接收 -> SparkStreaming -> (MySQL或者Redis)->Flask 前端显示结果(websocket) 1. 模拟产生数据 读取数据文件`ratings.csv`向Socket 写入 2. SparkStreaming 接收 通过内置的`socketTextStream`接收数据 3. 写出结果到数据库 写出到`mysql` 或者 `redis` 4. 前端轮询数据库显示结果 flask 后台,前端jquery轮询 ### SparkStreaming 处理部分 实时统计评分数量最多的电影 模拟产生数据的代码,读取评分文件,不断向socket写入数据(一行为一个), streaming程序启动后一直在运行,每隔duration就接收一行(或几行)作为一个batch 使用updateStateByKey实现累计数量,要累计就要保存上一次的,updateStateByKey的第二个参数可以获取到之前的value1,这个函数的返回值也是作为下一次的value1,就可以实现累计 ### 截图 将Top 15 写出到mysql 中 1. 启动数据模拟 ![simulator](README/simulator.png) 2. 启动spark streaming ![](README/sparkstreaming.png) 3. 启动flask server ![](README/flask_server.png) 4. 前端查看 页面会不断刷新 ![](README/front_end.png) 数据发送完成后,如果streaming还在运行,只会显示最后的结果,前端也不会更新 ### 安装测试 项目说明 - ratingLive Streaming主程序,idea 打开 - ratingSimulator 模拟产生评分工程,idea打开 - rating_server flask后端,前端展示代码,pycharm打开 ### 打包在集群测试 单机环境 - spark 2.0.0 - hadoop 2.6.0 - ![](README/deploy_test.png)