前言
本文使用环境版本
- Hive:2.1.0
- Flink:flink-1.12.7-bin-scala_2.12
Flink Table
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/
依赖
1 | <dependency> |
1 | <dependency> |
flink-stream-java和flink-stream-scala都提供了类似的API来管理Flink流; 您只需要使用其中一种语言即可,具体取决于您的语言。
请注意,无论您选择什么,诸如flink-runtime和flink-clients之类的某些依赖项都依赖于scala的版本(2.11或2.12),因为Flink基于Akka框架(用scala编写的框架)。
正在不断努力从更高级别的API flink-table(FLINK-11063)中删除akka依赖性。
1 | <dependency> |
1 | <dependency> |
读取HDFS文件
1 | <!--读取hadoop文件--> |
环境设置
Hive 需要开启元数据服务
1 | nohup hive --service metastore >/dev/null 2>&1 & |
Hive下配置
hive-site.xml
1 | <property> |
依赖
1 | <properties> |
调用代码
1 | import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableEnvironment} |
我们看一下初始化的方法
1 | public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir, @Nullable String hadoopConfDir, @Nullable String hiveVersion) { |
其中
参数 | 必填 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
catalogName | Yes | (none) | String | catalog 的类型,创建 HiveCatalog 时必须设置为:hive |
defaultDatabase | No | default | String | 指定当前 catalog 使用的默认数据库。 |
hiveConfDir | No | (none) | String | 包含 hive-site.xml 文件的 hive 目录的 URI,而且必须是 hadoop 文件系统支持的 URI。如果使用了相对路径的 URI ,比如没有指定 schema ,则会假定指定的本地文件系统。如果没有指定该选项,flink 将会从 class path 中搜索 hive-site.xml 文件。 |
hadoopConfDir | No | (none) | String | hadoop 配置文件路径,只支持本地文件系统路径。 建议的指定 hadoop 配置文件目录的方式是将 HADOOP_CONF_DIR 配置为环境变量,比如 /etc/profile。 只有在环境变量不起作用时,使用该参数,比如你想分别配置每个 HiveCatalog 环境。 |
hiveVersion | No | (none) | String | HiveCatalog 有能力自动推断使用的 hive 版本,建议不要指定 hive 版本号,除非自动推断失败。 |
使用本地配置文件
把hive-site.xml
放在项目下
1 | object WordCount { |
插入数据
1 | val sql2 = "INSERT INTO t_user4(id,name) SELECT id, name FROM t_user" |
判断传入参数
1 | if(args==null || args.length!=1){ |
Table和TableResult
- Table 可以调用计算处理相关方法 Table调用execute返回TableResult
- TableResult 可以用来打印
1 | //返回Table |
table语法示例
1 | val selectTables_sql3 = "select * from t_user4" |
FlinkSQL Cli集成Hive
Flink 支持以下 Hive 版本。
1.0.x(1.0.0、1.0.1),1.1.x(1.1.0、1.1.1),1.2.x(1.2.0、1.2.1、1.2.2)
2.0.x(2.0.0、2.0.1),2.1.x(2.1.0、2.1.1),2.2.0,2.3.x(2.3.0、2.3.1、2.3.2、2.3.4、2.3.5、2.3.6)
3.1.x(3.1.0、3.1.1、3.1.2)
环境变量
Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。
这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
添加HADOOP_CLASSPATH
1 | vi /etc/profile.d/hadoop.sh |
内容
1 | #HADOOP_HOME |
配置生效
1 | source /etc/profile |
测试
1 | echo $HADOOP_CLASSPATH |
添加Jar
注意
添加这些jar会和自己开发打的jar包冲突,所以,如果没有服务器上使用FlinkSQL的需求的化,不用添加下面的Jar包。
Flink1.12.7
集成Hive2.1.0
只需要在Flink的lib中添加如下三个jar包
以Hive2.1.0为例,分别为:
(1) flink-sql-connector-hive-2.2.0_2.12-1.12.7.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.12/1.12.7/
查看Metastore Version
1 | select * from VERSION; |
这个版本和Hive的版本一致
对应关系
Metastore version | Maven dependency |
---|---|
1.0.0 - 1.2.2 | flink-sql-connector-hive-1.2.2 |
2.0.0 - 2.2.0 | flink-sql-connector-hive-2.2.0 |
2.3.0 - 2.3.6 | flink-sql-connector-hive-2.3.6 |
3.0.0 - 3.1.2 | flink-sql-connector-hive-3.1.2 |
(2) flink-connector-hive_2.12-1.12.7.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/
(3) hive-exec-2.1.0.jar
Hive安装路径下的lib文件夹
1 | cp $HIVE_HOME/lib/hive-exec-2.1.0.jar $FLINK_HOME/lib/ |
配置
Hive下配置
hive-site.xml
1 | <property> |
Flink下配置
sql-client-defaults.yaml
该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:
1 | catalogs: [] # empty list |
修改为
1 | catalogs: |
开启元数据服务元数据服务
Hive 需要开启元数据服务
1 | nohup hive --service metastore >/dev/null 2>&1 & |
Hive中创建表
进入Hive客户端
1 | hive |
输入
1 | create table t_user(id int,name string,password string); |
退出
1 | exit; |
FlinkSQL Cli操作Hive
基本操作
首先启动FlinkSQL Cli,命令如下:
1 | $FLINK_HOME/bin/sql-client.sh embedded |
接下来,我们可以查看注册的catalog
1 | show catalogs; |
结果
default_catalog
myhive
使用注册的myhive catalog
1 | use catalog myhive; |
FlinkSQL操作Hive中的表,比如查询,写入数据。
1 | show tables; |
退出
1 | exit; |
运行报错
java.lang.RuntimeException: The Yarn application application_1667981758965_0021 doesn’t run anymore
修改yarn-site.xml配置文件,原因是可能内存超过虚拟内存的限制,所以需要对yarn进行虚拟内存限制修正,将如下两个配置改为false
1 | <property> |
show
1 | -- 列出catalog |
create
创建库
1 | CREATE DATABASE zdb; |
创建表
CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。
1 | CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name |
drop
DROP 语句可用于删除指定的 catalog,也可用于从当前或指定的 Catalog 中删除一个已经注册的表、视图或函数。
1 | --删除表 |
alter
ALTER 语句用于修改一个已经在 Catalog 中注册的表、视图或函数定义。
1 | --修改表名 |
insert
INSERT 语句用来向表中添加行(INTO是追加,OVERWRITE是覆盖)
1 | -- 1. 插入别的表的数据 |
三种建表的方式
1.基于DataSteam流来创建
注:此时先创建的一个Table对象,如果使用Table APi操作的话就可以直接操作了,如果要使用Sql的方式则需要先注册成一个view然后再操作
关键语句
1 | Table tableApi = tableEnv.fromDataStream(mapDataStream); |
举例
1 | public static void main(String[] args) throws Exception { |
2. 基于connect+withFormat+withSchema方式
注:此时是先注册成一个view,如果使用SQL操作的话就可以直接操作了,如果要使用Api的方式则需要使用from语句获得Table对象
关键语句
1 | tableEnv.connect(new FileSystem().path(filePath)) |
举例
1 | String filePath = "D:\\workspace21\\myflink\\src\\main\\resources\\sensors.txt"; |
从kafka读取数据转换成表
1 | import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} |
3.基于DDL建表语句,就是Create Table方式
注:此时和方式2一样,是先注册成一个view,如果使用SQL操作的话就可以直接操作了,如果要使用Api的方式则需要使用from语句获得Table对象
关键
1 | tableEnv.sqlUpdate(sinkDDL) |
举例
1 | val sinkDDL = "create table if not exists t_user2 (id varchar(20), name string)" |