1 Star 0 Fork 0

taikun928/flink-kafka-master

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
json.java 9.56 KB
一键复制 编辑 原始数据 按行查看 历史
taikun928 提交于 2024-02-29 00:17 +08:00 . flink-kafka
package Source;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import processFunction.UserBehavior;
import processFunction.itemPvViewCount;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Properties;
/*
实时热门商品统计
基本需求
统计近1个小时内的热门商品,每五分钟更新一次
热门度使用浏览(“pv”)来衡量
解决思路
在所有用户行为数据中,过滤出浏览(”pv”)行为进行统计
构建滑动窗口,窗口长度为1小时,滑动距离为5分钟,统计出每一种商品的访问数
再根据滑动窗口的时间,统计出访问次数最多的5个商品
*/
public class json {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行度
env.setParallelism(1);
//2.连接kafka,使用kafka消费者读取数据
/*
第二个参数是一个DeserializationSchema 或者 KeyedDeserializationSchema。
Kafka 消息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。
上面代码中使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,
它只是将字节数组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,
所以我们也可以自定义反序列化逻辑。
*/
Properties properties = new Properties();
//配置连接kafka集群信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
//key value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//创建与kafka消费者的连接
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("userBehavior", new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaSource = env.addSource(flinkKafkaConsumer);
SingleOutputStreamOperator<UserBehavior> mapStream = kafkaSource.map(new MapFunction<String, UserBehavior>() {
//{"user_id":124393,"item_id":2707570,"category_id":3898483,"behavior":"pv","timestamp":1511534024000}
//->UserBehavior{uerId=1000490, itemId=4029678, categoryId=2465336, behavior='pv', timestamp=1511539191000}
@Override
public UserBehavior map(String value) throws Exception {
JSONObject jsonObject = JSON.parseObject(value);
Long uerId = jsonObject.getLong("user_id");
Long itemId = jsonObject.getLong("item_id");
Integer categoryId = jsonObject.getInteger("category_id");
String behavior = jsonObject.getString("behavior");
Long timestamp = jsonObject.getLong("timestamp");
return new UserBehavior(uerId, itemId, categoryId, behavior, timestamp);
}
});
//TODO 提取时间戳并生成水位线
SingleOutputStreamOperator<UserBehavior> watermarkStream = mapStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
@Override
public long extractTimestamp(UserBehavior element, long recordTimestamp) {
return element.getTimestamp();
}
}));
// TODO 筛选出pv的数据,按照商品id分组,划分滑动时间窗口,对每个窗口进行增量聚合,
// 并将输出结果进行设定指定格式ItemViewCount
SingleOutputStreamOperator<itemPvViewCount> aggregateStream = watermarkStream.filter(data -> "pv".equals(data.getBehavior()))
.keyBy(data -> data.getItemId())
.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))
.aggregate(new itemPVSum(), new itemPvViewResult());
//TODO 对结果中同一个窗口的统计数据,进行排序处理
SingleOutputStreamOperator<String> result = aggregateStream.keyBy(data -> data.windowEnd)
.process(new topN(5));
result.print();
env.execute();
}
//TODO 自定义增量聚合, 设定同一个商品数据的聚合方法
public static class itemPVSum implements AggregateFunction<UserBehavior,Long,Long>{
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator+1L;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return a+b;
}
}
// TODO 设定输出格式,自定义全窗口函数,只需要包装窗口信息
public static class itemPvViewResult extends ProcessWindowFunction<Long,itemPvViewCount,Long,TimeWindow>{
@Override
public void process(Long aLong, ProcessWindowFunction<Long, itemPvViewCount, Long, TimeWindow>.Context context, Iterable<Long> elements, Collector<itemPvViewCount> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
out.collect(new itemPvViewCount(aLong,elements.iterator().next(),start,end));
}
}
//TODO 自定义处理函数,排序
public static class topN extends KeyedProcessFunction<Long,itemPvViewCount,String>{
//设置top几
private int n;
//定义一个列表状态
public ListState<itemPvViewCount> itemViewCountListState;
public topN(int n) {
this.n = n;
}
//获取状态句柄
@Override
public void open(Configuration parameters) throws Exception {
itemViewCountListState =getRuntimeContext().getListState(new ListStateDescriptor<itemPvViewCount>
("item", Types.POJO(itemPvViewCount.class)));
}
@Override
public void processElement(itemPvViewCount value, KeyedProcessFunction<Long, itemPvViewCount, String>.Context ctx, Collector<String> out) throws Exception {
//将count数据加到状态列表中,保存起来
itemViewCountListState.add(value);
//注册window end+1ms后的注册器,等待所有数据到齐开始排序
ctx.timerService().registerProcessingTimeTimer(ctx.getCurrentKey() + 1);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<Long, itemPvViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
// 将数据从列表状态变量中取出,放入 ArrayList,方便排序
ArrayList<itemPvViewCount> list = new ArrayList<>();
for(itemPvViewCount pvcount:itemViewCountListState.get()){
list.add(pvcount);
}
list.sort(new Comparator<itemPvViewCount>() {
@Override
public int compare(itemPvViewCount o1, itemPvViewCount o2) {
return (int) (o2.count- o1.count);
}
});
itemViewCountListState.clear();
//取出前5名输出结果
StringBuilder result = new StringBuilder();
result.append("==============================\n");
result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
for (int i = 0; i < Math.min(this.n, list.size()); i++) {
itemPvViewCount itemPvViewCount = list.get(i);
String info = "No." + (i + 1) + " "
+ "商品id" + itemPvViewCount.item+ " "
+ "浏览量:" + itemPvViewCount.count + "\n";
result.append(info);
}
result.append("==============================\n");
out.collect(result.toString());
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/taikun928/flink-kafka-master.git
git@gitee.com:taikun928/flink-kafka-master.git
taikun928
flink-kafka-master
flink-kafka-master
master

搜索帮助