# TrafficProject **Repository Path**: junjun_com/traffic-project ## Basic Information - **Project Name**: TrafficProject - **Description**: 智慧交通-spark入门项目 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2026-02-11 - **Last Updated**: 2026-02-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: Spark ## README # 交通监控大数据分析系统 ## 项目简介 本项目是一个基于Apache Spark的交通监控大数据分析系统,主要用于处理和分析城市交通监控数据,实现卡口流量统计、车辆轨迹追踪、区域路况分析、实时监控等多项功能。 ## 技术栈 - **核心框架**: Apache Spark (Scala/Java API) - **开发语言**: Java 8 - **数据库**: MySQL - **构建工具**: Maven - **实时处理**: Kafka + Spark Streaming - **辅助工具**: JSON (fastjson) ## 项目结构 ``` src/main/java/com/bjsxt/spark/ ├── areaRoadFlow/ # 区域道路流量分析模块 │ ├── AreaTop3RoadFlowAnalyze.java # 区域道路流量Top3分析 │ ├── MonitorOneStepConvertRateAnalyze.java # 卡口转化率分析 │ ├── ConcatStringStringUDF.java # UDF: 字符串拼接 │ ├── GroupConcatDistinctUDAF.java # UDAF: 分组去重拼接 │ ├── RandomPrefixUDF.java # UDF: 添加随机前缀 │ └── RemoveRandomPrefixUDF.java # UDF: 移除随机前缀 │ ├── skynet/ # 核心监控分析模块 │ ├── MonitorFlowAnalyze.java # 卡口流量分析 │ ├── MonitorCarTrack.java # 车辆轨迹追踪 │ ├── RandomExtractCars.java # 随机车辆抽取 │ ├── WithTheCarAnalyze.java # 同行车辆分析 │ ├── MonitorAndCameraStateAccumulator.java # 状态累加器 │ ├── SelfDefineAccumulator.java # 自定义累加器 │ └── SpeedSortKey.java # 速度排序键 │ ├── rtmroad/ # 实时道路分析模块 │ └── RoadRealTimeAnalyze.java # 道路实时分析 │ ├── dao/ # 数据访问层 │ ├── factory/DAOFactory.java # DAO工厂 │ ├── impl/ # DAO实现类 │ │ ├── TaskDAOImpl.java # 任务数据访问 │ │ ├── MonitorDAOImpl.java # 监控数据访问 │ │ ├── AreaDaoImpl.java # 区域数据访问 │ │ ├── CarTrackDAOImpl.java # 车辆轨迹数据访问 │ │ ├── RandomExtractDAOImpl.java # 随机抽取数据访问 │ │ └── WithTheCarDAOImpl.java # 同行车辆数据访问 │ └── interfaces/ # DAO接口定义 │ ├── domain/ # 实体类 │ ├── Task.java # 任务实体 │ ├── Area.java # 区域实体 │ ├── CarTrack.java # 车辆轨迹实体 │ ├── MonitorState.java # 监控状态实体 │ ├── TopNMonitor2CarCount.java # TopN卡口车辆统计 │ ├── TopNMonitorDetailInfo.java # TopN卡口详情 │ ├── RandomExtractCar.java # 随机抽取车辆 │ ├── RandomExtractMonitorDetail.java # 随机抽取详情 │ └── CarInfoPer5M.java # 5分钟车辆信息 │ ├── util/ # 工具类 │ ├── DateUtils.java # 日期处理 │ ├── StringUtils.java # 字符串处理 │ ├── NumberUtils.java # 数字处理 │ ├── ParamUtils.java # 参数处理 │ └── SparkUtils.java # Spark工具 │ ├── conf/ │ └── ConfigurationManager.java # 配置管理 │ └── jdbc/ └── JDBCHelper.java # JDBC帮助类 ``` ## 核心功能模块 ### 1. 卡口流量分析 (MonitorFlowAnalyze) 对监控卡口的车流量进行统计分析,识别高流量卡口并记录详细通行车辆信息。 **主要功能:** - 按卡口统计车流量 - 识别Top N高流量卡口 - 记录车辆速度分布 - 监控设备状态统计 ### 2. 车辆轨迹追踪 (MonitorCarTrack) 追踪特定车辆在监控系统中的完整行驶轨迹。 **主要功能:** - 按车牌号追踪车辆 - 生成车辆行驶路径 - 记录经过的所有卡口信息 ### 3. 区域道路流量分析 (AreaTop3RoadFlowAnalyze) 分析各区域的交通流量,统计区域内道路流量Top3。 **主要功能:** - 区域维度流量聚合 - 区域内道路排名 - 跨区域流量对比 ### 4. 随机车辆抽取 (RandomExtractCars) 从监控数据中随机抽取车辆样本进行详细分析。 **主要功能:** - 随机抽取车辆 - 生成车辆轨迹 - 保存抽取详情到数据库 ### 5. 卡口转化率分析 (MonitorOneStepConvertRateAnalyze) 分析相邻卡口之间的车辆转化率,识别异常通行。 **主要功能:** - 计算卡口间转化率 - 识别漏拍车辆 - 生成转化率报告 ### 6. 同行车辆分析 (WithTheCarAnalyze) 分析在相同时段内出现的同行车辆,识别可疑车辆组合。 **主要功能:** - 识别同行车辆 - 同行时间窗口分析 - 同行频次统计 ### 7. 实时道路分析 (RoadRealTimeAnalyze) 基于Kafka实时数据流进行道路状态实时监控和分析。 **主要功能:** - 实时数据接收 - 实时路况计算 - 异常车辆告警 ## 编译与运行 ### 环境要求 - JDK 1.8 - Apache Spark 2.x - Maven 3.x - MySQL 5.7+ - Kafka (实时模块需要) ### 编译命令 ```bash mvn clean package -DskipTests ``` ### 运行方式 #### 1. 批次分析任务 ```bash spark-submit \ --class com.bjsxt.spark.skynet.MonitorFlowAnalyze \ --master yarn-client \ --num-executors 4 \ --driver-memory 2g \ --executor-memory 2g \ traffic-project.jar ``` #### 2. 实时分析任务 ```bash spark-submit \ --class com.bjsxt.spark.rtmroad.RoadRealTimeAnalyze \ --master yarn-client \ traffic-project.jar ``` ## 配置说明 配置文件位于 `src/main/resources/my.properties`,主要配置项: ```properties # 数据库配置 jdbc.url=jdbc:mysql://localhost:3306/traffic jdbc.user=root jdbc.password=root # Kafka配置 (实时模块) kafka.broker.list=localhost:9092 kafka.topic=traffic_monitor # Spark配置 spark.master=yarn-client spark.app.name=TrafficAnalyze ``` ## 数据库表结构 系统使用以下主要数据表: - `task_info` - 任务信息表 - `monitor_info` - 卡口信息表 - `camera_info` - 摄像头信息表 - `topn_monitor_car_count` - TopN卡口车辆统计 - `topn_monitor_detail_info` - 卡口通行详情 - `car_track` - 车辆轨迹表 - `monitor_state` - 监控状态表 - `random_extract_car` - 随机抽取车辆 - `random_extract_monitor_detail` - 抽取详情 ## 领域模型 ### Task (任务) | 字段 | 类型 | 说明 | |------|------|------| | taskId | long | 任务ID | | taskName | String | 任务名称 | | taskParams | String | 任务参数(JSON格式) | | taskStatus | String | 任务状态 | ### CarTrack (车辆轨迹) | 字段 | 类型 | 说明 | |------|------|------| | taskId | long | 关联任务ID | | car | String | 车牌号 | | date | String | 日期 | | track | String | 轨迹详情(卡口列表) | ### Area (区域) | 字段 | 类型 | 说明 | |------|------|------| | areaId | String | 区域ID | | areaName | String | 区域名称 | ## 工具类说明 ### StringUtils - `isEmpty/isNotEmpty` - 字符串空值判断 - `trimComma` - 去除末尾逗号 - `fulfuill` - 字符串补齐 - `getFieldFromConcatString` - 从拼接字符串中获取字段 - `setFieldInConcatString` - 设置拼接字符串中的字段 ### DateUtils - `formatDate/parseDate` - 日期格式化 - `formatTimeMinute` - 精确到分钟的时间格式化 - `getRangeTime` - 获取时间范围 - `getDateHour` - 获取日期小时 ## UDF/UDAF自定义函数 项目中实现了多个Spark SQL自定义函数: | 函数名 | 类型 | 功能 | |--------|------|------| | `RandomPrefixUDF` | UDF | 为字符串添加随机前缀,用于数据倾斜处理 | | `RemoveRandomPrefixUDF` | UDF | 移除随机前缀 | | `ConcatStringStringUDF` | UDF | 拼接字符串 | | `GroupConcatDistinctUDAF` | UDAF | 分组后去重拼接 | ## License 本项目仅供学习和研究使用。