文章目录
需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。
|外部|版本|jar
|------
|kafka|4.1|flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
|elasticsearch|7.6|flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
|mysql|5.7|flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar
✍🏻作者简介:机器学习,深度学习,卷积神经网络处理,图像处理
🚀B站项目实战:https://space.bilibili.com/364224477
😄 如果文章对你有帮助的话, 欢迎评论 💬点赞👍🏻 收藏 📂加关注+
🤵♂代做需求:@个人主页
1.2 生成 kafka 数据
用户行为数据来源: 阿里云天池公开数据集
网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5
商品类目纬度数据来源: category.sql
数据生成器:datagen.py
有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。
修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据
1.3 开发前的三个小 tip
使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表
2.1 创建 kafka 数据源表
2.2 指标统计:每小时成交量
2.2.1 创建 es 结果表, 存放每小时的成交量
2.2.2 执行 sql ,统计每小时的成交量
2.3 指标统计:每10分钟累计独立用户数
2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
2.3.2 创建视图
2.3.3 执行 sql ,统计每10分钟的累计独立用户数
2.4 指标统计:商品类目销量排行
2.4.1 创建商品类目维表
先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。
2.4.1 创建 es 结果表,存放商品类目排行表
2.4.2 创建视图
2.4.3 执行 sql , 统计商品类目销量排行
3.1 最终效果
整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。