官网
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/concepts/overview/
依赖
1 |
|
加载数据
代码中加载
1 | import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} |
从本地文件中加载
文件
D:\bigdata_study\stu_list.txt
1 | 10010,张三,女,16,IS |
示例
1 | import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} |
HDFS文件
创建文件夹
1 | hadoop fs -mkdir /bigdata_study |
查看 http://192.168.7.101:50070/explorer.html#/
导入文件
1 | hadoop fs -put /data/tools/bigdata/bigdata_study/stu_list.txt /bigdata_study |
依赖中添加
1 | <!--读取hadoop文件--> |
readTextFile()
通过 readTextFile() 方法可以将本地或hdfs上的文件作为数据源读入Flink
HDFS文件
1 | val inputFile = "hdfs://192.168.7.101:9000/bigdata_study/stu_list.txt" |
示例
1 | import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} |
服务所在服务器上执行Jar
1 | # 单个NameNode |
注意
假如我们配置了Hadoop NameNode的高可用
可以通过
hdfs://hdfsns
来访问,但是实际生效的节点为hdfs://192.168.7.102:9000
如果在Hadoop所在的环境中 两种方式均可以访问,但是如果从外部访问只能选择
hdfs://192.168.7.102:9000
这种方式
创建一个YARN模式的flink集群:
1 | bash $FLINK_HOME/bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m -d |
在Yarn中执行
1 | flink run -m yarn-cluster -ynm mytest ./WordCount.jar "hdfs://192.168.7.102:9000/bigdata_study/stu_list.txt" |
Kafka中加载
1 | import java.util |
依赖
1 |
|
数据导出
导出到HDFS
1 | //写入到HDFS |
导出到文件
1 | //写入到文件 |
值转换
Flink的Transformation转换主要包括四种:
- 单数据流基本转换
- 基于Key的分组转换
- 多数据流转换
- 数据重分布转换
单数据流基本转换
map
1 | import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} |
添加或删除属性
1 |
|
基于Key的分组转换
对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。
groupBy
会将一个DataSet
转化为一个GroupedDataSet
,聚合操作会将GroupedDataSet
转化为DataSet
。如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。
aggregation
常见的聚合操作有sum
、max
、min
等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟groupBy
相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。
与批处理不同,这些聚合函数是对流数据进行数据,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum
算子的功能对该字段进行加和,并将结果保存在该字段上。min
操作无法确定其他字段的数值。
1 | val tupleStream = env.fromElements( |
第0个分组,第1个求和
结果
(1,3,8)
(0,3,2)
reduce
1 | import org.apache.flink.api.common.operators.Order |
结果
(up,1)
(day,2)
(good,2)
(study,1)
Java和Scala对比
Java
1 | import org.apache.flink.api.common.functions.FilterFunction; |
Scala
1 | import org.apache.flink.api.scala._ |
日志报错
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console
原因:
在pom.xml文件中导入了log4j的依赖:
1 | <dependency> |
但是,log4j2的配置文件并没有导入,尝试导入log4j.properties
,但并不行,需要导入log4j2.xml
解决方式
在工程的resources目录下新建一个文件:log4j2.xml
,
然后在该文件中下入以下配置信息:
1 |
|
机器学习Alink
Spark对应的机器学习框架SparkML
Flink对应的机器学习框架FlinkML/Alink
FlinkML
https://github.com/apache/flink-ml
1 | <dependency> |
Alink
Alink与SparkML算法相比,Alink算法更全面,性能更优异,场景更丰富(同时支持流批),本地化更出色(支持中文分词)是快速搭建在线机器学习系统的不二之选。
镜像仓库:Alink
教程:https://www.yuque.com/pinshu/alink_tutorial/book_java