# TrafficBySparkAndKafka **Repository Path**: retrofit/TrafficBySparkAndKafka ## Basic Information - **Project Name**: TrafficBySparkAndKafka - **Description**: TrafficBySparkAndKafka是一个关于交通监控项目,实现了对车流量,道路转换和实时拥堵统计。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 10 - **Created**: 2022-04-26 - **Last Updated**: 2022-04-26 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 一、项目简介 TrafficBySparkAndKafka是一个车辆监控项目。主要实现了三个功能: 1.计算每一个区域车流量最多的前3条道路。 2.计算道路转换率 3.实时统计道路拥堵情况(当前时间,卡口编号,车辆总数,速度总数,平均速度) ## 二、项目结构 ``` ├─TrafficBySparkAndKafka ├─data └─src ├─main │ ├─java │ │ └─vip │ │ ├─producedate2hive(模拟数据到文件和Hive) │ │ ├─shuai7boy │ │ │ └─trafficTemp │ │ │ ├─areaRoadFlow(每个区域top3道路速度统计。道路转换率。) │ │ │ ├─conf (获取配置文件帮助类) │ │ │ ├─constant (接口静态类,防止硬编码) │ │ │ ├─dao │ │ │ │ ├─factory (工厂类) │ │ │ │ └─impl (接口实现类) │ │ │ ├─domain (属性定义类) │ │ │ ├─jdbc (jdbc帮助类) │ │ │ ├─rtmroad(实时统计道路拥堵情况) │ │ │ ├─skynet │ │ │ └─util (帮助类) │ │ └─spark │ │ └─spark │ │ └─test (模拟实时数据) │ ├─resources │ └─scala │ └─top │ └─shuai7boy │ └─trafficTemp │ └─areaRoadFlow (利用scala和java互调用,实现top3道路速度统计) │ └─test └─java ``` ## 三、数据源 **数据源类型:** monitor_flow_action(每个摄像头的监控数据) 当天日期 卡口编号 摄像头编号 车牌号 拍摄时间 车速 道路编号 区域编号 ``` 2020-05-08 0001 34287 京M80025 2020-05-08 05:35:58 57 25 03 2020-05-08 0005 99132 京M80025 2020-05-08 05:51:28 149 50 04 ``` monitor_camera_info(卡口和摄像头对应编号) ``` 0006 00443 0006 25745 0006 98681 0006 36400 ``` **存储介质:** 如果在本地运行的话,这里读取的是本地文件。 如果在集群运行,对于批处理读取的是Hive,对于流处理这里读取Kafka。 ## 四、数据转换流程 1.计算每一个区域车流量最多的前3条道路。 - 从表traffic.monitor_flow_action根据日期获取车流量监控日志信息。 挡在集群中时,traffic.monitor_flow_action代表的是hive中的表,当在本地运行时,traffic.monitor_flow_action是本地创建的临时表。 - 从area_info表中获取区域信息。 area_info是MySql中的表。 - 根据步骤二获取的区域信息,补全监控日志名称。根据join,map即可拼接一个新的RDD,并将RDD转换为DataFrame的临时表tmp_car_flow_basic。 - 统计各个区域的道路车流量。 使用Spark SQL根据区域名称,道路ID进行分组。即可统计出每个区域,每条道路对应的车流量。 - 统计每个区域top3车流量。 利用开窗函数进行统计。row_number() over(partition by area_name order by road_id desc) 用到的技术:Hive,Spark SQL,临时表,MySql,JDBC,join,map,RDD转换DataFrame。 2.计算道路转换率 - 从MySql拿出我们要对比的转换路段 - 从日志拿出指定日期的监控数据 - 将监控数据转换为键值对(car,row)格式 - 计算每个路段的匹配情况。 逻辑:将第三步拿到的数据,根据car进行分组,映射键值对。将轨迹信息根据时间进行排序,然后拼接。 将我们指定的路段(第一步获取到的)和上面拼接的数据进行比对,得出匹配情况。(路段,匹配次数) - 因为上面求的是多辆车的 (路段,匹配次数)。这步使用reduceByKey进行聚合,将相同路段进行汇总。 - 获取转化率。 转换率=(这次路段的匹配度)/(上次路段的匹配度)即可得到。、 这次路段的匹配度=(聚合数据.get(路段)) 用到的技术:mapToPair,groupByKey,flatMapToPair(进来一辆车,出去多个对应路段信息),reduceByKey。 3.实时统计道路拥堵情况(根据车辆和车速判断) - 根据日志获取(卡口ID,汽车速度)格式数据 - 获取(卡口ID,(汽车速度,1))格式数据,后面的1代表车辆数 - 获取(卡口ID,(汽车总速度,总车辆数)) - 打印车辆(卡口,总速度,总车辆,平均速度) 用到的技术:map,mapToPair,mapValues(仅仅针对value进行map,(key,(value,1))格式数据),reduceByKeyAndWindow。 [大数据系列传送门]( https://www.cnblogs.com/shun7man/p/11518083.html?_blank )